MessageQueue with Websocket
How to use this library
for Cloud
How to initial
package main
import (
"log"
wsServe "github.com/zhepoch/wsmessagequeue/cloud"
"github.com/zhepoch/wsmessagequeue/cloud/dbCloud"
)
var (
wsSqlitePath = pflag.String("ws-sqlite-path",
"cloud.sqlite", "storage websocket sqlite file path")
rootCtx = context.Background()
hub *wsServe.Hub
)
func handlerMessage(message []byte) error {
log.Println(string(message))
return nil
}
func handlerCallbackMessage(message []byte) ([]byte, error) {
log.Println(string(message))
return []byte("response from edge"), nil
}
func main() {
if err := dbCloud.InitDB(*wsSqlitePath); err != nil {
log.Fatalf("Initial websocket database failed: %v", err)
}
hubConfig := wsServe.NewDefaultConfig()
hubConfig.SetMaxMessageSize(1024000)
hub = wsServe.NewHub(rootCtx, hubConfig, handlerMessage, handlerCallbackMessage, false/*clearHistoryClients*/)
if err := hub.AddEdgeEvent(); err != nil {
log.Fatalf("Auto connection edge form database failed: %v", err)
}
}
How to AddEdge and DeleteEdge
clientInfo := &dbCloud.ClientInfo{
ID: "test_client",
IP: "127.0.0.1",
Port: 8080,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
_ = clientInfo.UpdateOrCreate()
_ = hub.AddEdgeEvent()
edgeID := "test_client"
_ = dbCloud.DeleteClientInfo(edgeID)
_ = dbCloud.DeleteSendMsgInfoByClientID(edgeID)
_ = hub.DeleteEdgeEvent(edgeID)
How to send message to edge
// send message to all
messageByteJson := []byte(`{"hello":"world"}`)
_ = hub.SendToAll(messageByteJson)
// send message to the specific client
edgeID := "test_client"
_ = hub.SendOne(edgeID, messgeByteJson)
// send message and waiting response
edgeID := "test_client"
reply, err := hub.SendOneAndReply(edgeID, messageByteJson, time.Second * 5)
log.Println(string(reply), err)
for Edge
package main
import (
"fmt"
"log"
"net/http"
"github.com/spf13/pflag"
wsEdge "github.com/zhepoch/wsmessagequeue/edge"
"github.com/zhepoch/wsmessagequeue/edge/dbEdge"
utilLog "github.com/zhepoch/wsmessagequeue/utils/log"
)
var (
wsSqlitePath = pflag.String("ws-sqlite-path", "edge.sqlite", "storage websocket sqlite file path")
wsListenPort = pflag.Int("ws-listen-port", 8090, "edge ws listen port")
rootCtx = context.Background()
wsCliManager *wsEdge.ClientManager
)
func handlerMessage(message []byte) error {
log.Println(string(message))
return nil
}
func handlerCallbackMessage(message []byte) ([]byte, error) {
log.Println(string(message))
return []byte("response from edge"), nil
}
func main() {
if err := dbEdge.InitDB(*wsSqlitePath); err != nil {
log.Fatalf("Initial websocket database failed: %v", err)
}
wsCliManager = &wsEdge.ClientManager{
WsCli: nil,
Usable: false,
}
go func() {
config := wsEdge.NewDefaultConfig()
config.SetMaxMessageSize(1024000)
config.SetLogger(utilLog.NewGlogLogger())
err := wsEdge.RunWsServer(fmt.Sprintf("0.0.0.0:%d", *wsListenPort), wsCliManager, config, "", handlerMessage, handlerCallbackMessage)
if err != nil && err != http.ErrServerClosed {
log.Fatalf("failed to start websocket servr: %v\n", err)
}
}()
}
How to send message to cloud
messageByteJson := []byte(`{"hello":"world"}`)
wsCliManager.Lock()
defer wsCliManager.Unlock()
if wsCliManager.Usable {
err := wsCliManager.WsCli.SendMessage(messageByteJson)
reply, err := wsCliManager.WsCli.SendMessageAndReply(messageByteJson, time.Second * 5)
log.Println(string(reply), err)
}
For more usage, please see example folder