msggateway

package
v3.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2024 License: Apache-2.0 Imports: 53 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MessageText is for UTF-8 encoded text messages like JSON.
	MessageText = iota + 1
	// MessageBinary is for binary messages like protobufs.
	MessageBinary
	// CloseMessage denotes a close control message. The optional message
	// payload contains a numeric code and text. Use the FormatCloseMessage
	// function to format a close message payload.
	CloseMessage = 8

	// PingMessage denotes a ping control message. The optional message payload
	// is UTF-8 encoded text.
	PingMessage = 9

	// PongMessage denotes a pong control message. The optional message payload
	// is UTF-8 encoded text.
	PongMessage = 10
)
View Source
const (
	WsUserID                = "sendID"
	CommonUserID            = "userID"
	PlatformID              = "platformID"
	ConnID                  = "connID"
	Token                   = "token"
	OperationID             = "operationID"
	Compression             = "compression"
	GzipCompressionProtocol = "gzip"
	BackgroundStatus        = "isBackground"
	SendResponse            = "isMsgResp"
)
View Source
const (
	// Websocket Protocol.
	WSGetNewestSeq        = 1001
	WSPullMsgBySeqList    = 1002
	WSSendMsg             = 1003
	WSSendSignalMsg       = 1004
	WSPushMsg             = 2001
	WSKickOnlineMsg       = 2002
	WsLogoutMsg           = 2003
	WsSetBackgroundStatus = 2004
	WsSubUserOnlineStatus = 2005
	WSDataError           = 3001
)
View Source
const (
	WebSocket = iota + 1
)

Variables

View Source
var (
	ErrConnClosed                = errs.New("conn has closed")
	ErrNotSupportMessageProtocol = errs.New("not support message protocol")
	ErrClientClosed              = errs.New("client actively close the connection")
	ErrPanic                     = errs.New("panic error")
)

Functions

func Start added in v3.7.0

func Start(ctx context.Context, index int, conf *Config) error

Start run ws server.

Types

type Client

type Client struct {
	PlatformID   int    `json:"platformID"`
	IsCompress   bool   `json:"isCompress"`
	UserID       string `json:"userID"`
	IsBackground bool   `json:"isBackground"`
	// contains filtered or unexported fields
}

func (*Client) KickOnlineMessage

func (c *Client) KickOnlineMessage() error

func (*Client) PushMessage

func (c *Client) PushMessage(ctx context.Context, msgData *sdkws.MsgData) error

func (*Client) PushUserOnlineStatus added in v3.8.0

func (c *Client) PushUserOnlineStatus(data []byte) error

func (*Client) ResetClient

func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer)

ResetClient updates the client's state with new connection and context information.

type Compressor

type Compressor interface {
	Compress(rawData []byte) ([]byte, error)
	CompressWithPool(rawData []byte) ([]byte, error)
	DeCompress(compressedData []byte) ([]byte, error)
	DecompressWithPool(compressedData []byte) ([]byte, error)
}

type Config added in v3.7.0

type Config struct {
	MsgGateway     config.MsgGateway
	Share          config.Share
	RedisConfig    config.Redis
	WebhooksConfig config.Webhooks
	Discovery      config.Discovery
}

type Encoder

type Encoder interface {
	Encode(data any) ([]byte, error)
	Decode(encodeData []byte, decodeData any) error
}

type GWebSocket

type GWebSocket struct {
	// contains filtered or unexported fields
}

func (*GWebSocket) Close

func (d *GWebSocket) Close() error

func (*GWebSocket) Dial

func (d *GWebSocket) Dial(urlStr string, requestHeader http.Header) (*http.Response, error)

func (*GWebSocket) GenerateLongConn

func (d *GWebSocket) GenerateLongConn(w http.ResponseWriter, r *http.Request) error

func (*GWebSocket) IsNil

func (d *GWebSocket) IsNil() bool

func (*GWebSocket) ReadMessage

func (d *GWebSocket) ReadMessage() (int, []byte, error)

func (*GWebSocket) RespondWithError added in v3.7.0

func (d *GWebSocket) RespondWithError(err error, w http.ResponseWriter, r *http.Request) error

func (*GWebSocket) RespondWithSuccess added in v3.7.0

func (d *GWebSocket) RespondWithSuccess() error

func (*GWebSocket) SetConnNil

func (d *GWebSocket) SetConnNil()

func (*GWebSocket) SetPingHandler added in v3.3.2

func (d *GWebSocket) SetPingHandler(handler PingPongHandler)

func (*GWebSocket) SetPongHandler

func (d *GWebSocket) SetPongHandler(handler PingPongHandler)

func (*GWebSocket) SetReadDeadline

func (d *GWebSocket) SetReadDeadline(timeout time.Duration) error

func (*GWebSocket) SetReadLimit

func (d *GWebSocket) SetReadLimit(limit int64)

func (*GWebSocket) SetWriteDeadline

func (d *GWebSocket) SetWriteDeadline(timeout time.Duration) error

func (*GWebSocket) WriteMessage

func (d *GWebSocket) WriteMessage(messageType int, message []byte) error

type GobEncoder

type GobEncoder struct{}

func NewGobEncoder

func NewGobEncoder() *GobEncoder

func (*GobEncoder) Decode

func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error

func (*GobEncoder) Encode

func (g *GobEncoder) Encode(data any) ([]byte, error)

type GrpcHandler

type GrpcHandler struct {
	// contains filtered or unexported fields
}

func NewGrpcHandler

func NewGrpcHandler(validate *validator.Validate, client discovery.SvcDiscoveryRegistry, rpcRegisterName *config.RpcRegisterName) *GrpcHandler

func (GrpcHandler) GetSeq

func (g GrpcHandler) GetSeq(ctx context.Context, data *Req) ([]byte, error)

func (GrpcHandler) PullMessageBySeqList

func (g GrpcHandler) PullMessageBySeqList(context context.Context, data *Req) ([]byte, error)

func (GrpcHandler) SendMessage

func (g GrpcHandler) SendMessage(ctx context.Context, data *Req) ([]byte, error)

SendMessage handles the sending of messages through gRPC. It unmarshals the request data, validates the message, and then sends it using the message RPC client.

func (GrpcHandler) SendSignalMessage

func (g GrpcHandler) SendSignalMessage(context context.Context, data *Req) ([]byte, error)

func (GrpcHandler) SetUserDeviceBackground

func (g GrpcHandler) SetUserDeviceBackground(_ context.Context, data *Req) ([]byte, bool, error)

func (GrpcHandler) UserLogout

func (g GrpcHandler) UserLogout(context context.Context, data *Req) ([]byte, error)

type GzipCompressor

type GzipCompressor struct {
	// contains filtered or unexported fields
}

func NewGzipCompressor

func NewGzipCompressor() *GzipCompressor

func (*GzipCompressor) Compress

func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error)

func (*GzipCompressor) CompressWithPool added in v3.4.2

func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error)

func (*GzipCompressor) DeCompress

func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error)

func (*GzipCompressor) DecompressWithPool added in v3.4.2

func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error)

type LongConn

type LongConn interface {
	// Close this connection
	Close() error
	// WriteMessage Write message to connection,messageType means data type,can be set binary(2) and text(1).
	WriteMessage(messageType int, message []byte) error
	// ReadMessage Read message from connection.
	ReadMessage() (int, []byte, error)
	// SetReadDeadline sets the read deadline on the underlying network connection,
	// after a read has timed out, will return an error.
	SetReadDeadline(timeout time.Duration) error
	// SetWriteDeadline sets to write deadline when send message,when read has timed out,will return error.
	SetWriteDeadline(timeout time.Duration) error
	// Dial Try to dial a connection,url must set auth args,header can control compress data
	Dial(urlStr string, requestHeader http.Header) (*http.Response, error)
	// IsNil Whether the connection of the current long connection is nil
	IsNil() bool
	// SetConnNil Set the connection of the current long connection to nil
	SetConnNil()
	// SetReadLimit sets the maximum size for a message read from the peer.bytes
	SetReadLimit(limit int64)
	SetPongHandler(handler PingPongHandler)
	SetPingHandler(handler PingPongHandler)
	// GenerateLongConn Check the connection of the current and when it was sent are the same
	GenerateLongConn(w http.ResponseWriter, r *http.Request) error
}

type LongConnServer

type LongConnServer interface {
	Run(done chan error) error

	GetUserAllCons(userID string) ([]*Client, bool)
	GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)
	Validate(s any) error
	SetDiscoveryRegistry(client discovery.SvcDiscoveryRegistry, config *Config)
	KickUserConn(client *Client) error
	UnRegister(c *Client)
	SetKickHandlerInfo(i *kickHandler)
	SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)
	Compressor
	Encoder
	MessageHandler
	// contains filtered or unexported methods
}

type MessageHandler

type MessageHandler interface {
	GetSeq(context context.Context, data *Req) ([]byte, error)
	SendMessage(context context.Context, data *Req) ([]byte, error)
	SendSignalMessage(context context.Context, data *Req) ([]byte, error)
	PullMessageBySeqList(context context.Context, data *Req) ([]byte, error)
	UserLogout(context context.Context, data *Req) ([]byte, error)
	SetUserDeviceBackground(context context.Context, data *Req) ([]byte, bool, error)
}

type Option

type Option func(opt *configs)

func WithHandshakeTimeout

func WithHandshakeTimeout(t time.Duration) Option

func WithMaxConnNum

func WithMaxConnNum(num int64) Option

func WithMessageMaxMsgLength

func WithMessageMaxMsgLength(length int) Option

func WithPort

func WithPort(port int) Option

func WithWriteBufferSize added in v3.4.2

func WithWriteBufferSize(size int) Option

type PingPongHandler added in v3.3.2

type PingPongHandler func(string) error

type Req

type Req struct {
	ReqIdentifier int32  `json:"reqIdentifier" validate:"required"`
	Token         string `json:"token"`
	SendID        string `json:"sendID"        validate:"required"`
	OperationID   string `json:"operationID"   validate:"required"`
	MsgIncr       string `json:"msgIncr"       validate:"required"`
	Data          []byte `json:"data"`
}

func (*Req) String

func (r *Req) String() string

type Resp

type Resp struct {
	ReqIdentifier int32  `json:"reqIdentifier"`
	MsgIncr       string `json:"msgIncr"`
	OperationID   string `json:"operationID"`
	ErrCode       int    `json:"errCode"`
	ErrMsg        string `json:"errMsg"`
	Data          []byte `json:"data"`
}

func (*Resp) String

func (r *Resp) String() string

type Server

type Server struct {
	LongConnServer LongConnServer
	// contains filtered or unexported fields
}

func NewServer

func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server

func (*Server) InitServer

func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error

func (*Server) KickUserOffline

func (*Server) OnlinePushMsg

func (s *Server) OnlinePushMsg(
	context context.Context,
	req *msggateway.OnlinePushMsgReq,
) (*msggateway.OnlinePushMsgResp, error)

func (*Server) SetLongConnServer

func (s *Server) SetLongConnServer(LongConnServer LongConnServer)

func (*Server) Start

func (s *Server) Start(ctx context.Context, index int, conf *Config) error

type Subscription added in v3.8.0

type Subscription struct {
	// contains filtered or unexported fields
}

func (*Subscription) DelClient added in v3.8.0

func (s *Subscription) DelClient(client *Client)

func (*Subscription) GetClient added in v3.8.0

func (s *Subscription) GetClient(userID string) []*Client

func (*Subscription) Sub added in v3.8.0

func (s *Subscription) Sub(client *Client, addUserIDs, delUserIDs []string)

type UserConnContext

type UserConnContext struct {
	RespWriter http.ResponseWriter
	Req        *http.Request
	Path       string
	Method     string
	RemoteAddr string
	ConnID     string
}

func (*UserConnContext) Deadline

func (c *UserConnContext) Deadline() (deadline time.Time, ok bool)

func (*UserConnContext) Done

func (c *UserConnContext) Done() <-chan struct{}

func (*UserConnContext) Err

func (c *UserConnContext) Err() error

func (*UserConnContext) ErrReturn

func (c *UserConnContext) ErrReturn(error string, code int)

func (*UserConnContext) GetBackground

func (c *UserConnContext) GetBackground() bool

func (*UserConnContext) GetCompression added in v3.7.0

func (c *UserConnContext) GetCompression() bool

func (*UserConnContext) GetConnID

func (c *UserConnContext) GetConnID() string

func (*UserConnContext) GetHeader

func (c *UserConnContext) GetHeader(key string) (string, bool)

func (*UserConnContext) GetOperationID

func (c *UserConnContext) GetOperationID() string

func (*UserConnContext) GetPlatformID

func (c *UserConnContext) GetPlatformID() string

func (*UserConnContext) GetRemoteAddr

func (c *UserConnContext) GetRemoteAddr() string

func (*UserConnContext) GetToken

func (c *UserConnContext) GetToken() string

func (*UserConnContext) GetUserID

func (c *UserConnContext) GetUserID() string

func (*UserConnContext) ParseEssentialArgs added in v3.7.0

func (c *UserConnContext) ParseEssentialArgs() error

func (*UserConnContext) Query

func (c *UserConnContext) Query(key string) (string, bool)

func (*UserConnContext) SetHeader

func (c *UserConnContext) SetHeader(key, value string)

func (*UserConnContext) SetOperationID

func (c *UserConnContext) SetOperationID(operationID string)

func (*UserConnContext) SetToken

func (c *UserConnContext) SetToken(token string)

func (*UserConnContext) ShouldSendResp added in v3.7.0

func (c *UserConnContext) ShouldSendResp() bool

func (*UserConnContext) Value

func (c *UserConnContext) Value(key any) any

type UserMap

type UserMap interface {
	GetAll(userID string) ([]*Client, bool)
	Get(userID string, platformID int) ([]*Client, bool, bool)
	Set(userID string, v *Client)
	DeleteClients(userID string, clients []*Client) (isDeleteUser bool)
	UserState() <-chan UserState
	GetAllUserStatus(deadline time.Time, nowtime time.Time) []UserState
	RecvSubChange(userID string, platformIDs []int32) bool
}

type UserPlatform added in v3.8.0

type UserPlatform struct {
	Time    time.Time
	Clients []*Client
}

func (*UserPlatform) PlatformIDSet added in v3.8.0

func (u *UserPlatform) PlatformIDSet() map[int32]struct{}

func (*UserPlatform) PlatformIDs added in v3.8.0

func (u *UserPlatform) PlatformIDs() []int32

type UserState added in v3.8.0

type UserState struct {
	UserID  string
	Online  []int32
	Offline []int32
}

type WsServer

type WsServer struct {
	Compressor
	Encoder
	MessageHandler
	// contains filtered or unexported fields
}

func NewWsServer

func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer

func (*WsServer) ChangeOnlineStatus added in v3.8.0

func (ws *WsServer) ChangeOnlineStatus(concurrent int)

func (*WsServer) GetUserAllCons

func (ws *WsServer) GetUserAllCons(userID string) ([]*Client, bool)

func (*WsServer) GetUserPlatformCons

func (ws *WsServer) GetUserPlatformCons(userID string, platform int) ([]*Client, bool, bool)

func (*WsServer) KickUserConn

func (ws *WsServer) KickUserConn(client *Client) error

func (*WsServer) Run

func (ws *WsServer) Run(done chan error) error

func (*WsServer) SetDiscoveryRegistry

func (ws *WsServer) SetDiscoveryRegistry(disCov discovery.SvcDiscoveryRegistry, config *Config)

func (*WsServer) SetKickHandlerInfo

func (ws *WsServer) SetKickHandlerInfo(i *kickHandler)

func (*WsServer) SubUserOnlineStatus added in v3.8.0

func (ws *WsServer) SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)

func (*WsServer) UnRegister

func (ws *WsServer) UnRegister(c *Client)

func (*WsServer) Validate

func (ws *WsServer) Validate(_ any) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL