README
¶
WebSocket Service
This service provides an Http server supporting WebSocket protocol to send realtime events to user's interfaces. It is hooked to various internal Topics and contains most of the logic to decide whether to broadcast a message to a given connected user.
Service is listening by default on the [::]:5050/ws and [::]:5050/chat (see below) endpoints.
Currently only the ReactJS interface is using this connection to update in real-time.
Internal Events to Websocket Sessions
Sessions
Websocket server is implemented internaly using GIN and Melody libraries. WS connections are identified as "Sessions", and when a client is connecting, it must send a first "register:" message containing the current JWT token, as it would for any other public API in the micro-services architecture. When received, the JWT is parsed and the current user workspaces are stored in the session.
When an internal event arrives on the micro event bus, the websocket logic will map the various active sessions and generally use their username or list of workspaces to broadcast the event to each of them (or not).
Wired Events
- common.TOPIC_TREE_CHANGES : Sends tree events to dynamically update files and folders in the interface
- common.TOPIC_META_CHANGES : Same as TREE_CHANGES
- common.TOPIC_JOB_TASK_EVENT : Send Task events to show tasks progression
- common.TOPIC_IDM_EVENT : Identity Management are sent to user to trigger a reload of their roles and ACL's
- common.TOPIC_ACTIVITY_EVENT : Activities events will refresh events feeds and alerts
Chat Handler
A dedicated handler is listening on [::]:5050/chat and is specifically plugged to the internal CHAT topic to dynamically create rooms, connect users to them, etc. It provides a gateway for all chat operations, using the following protocol :
- Connection to websocket : as for the common ws, send a "subscribe" message with the JWT token
- Connection to a chat room: JOIN message, and a ChatRoom json representation (including chat type and chat Uuid. It will create the room if not already existing, and send back all the previous messages for this room
- Post a message : POST message, with ChatMessage and ChatRoom
- Disconnect from a room: LEAVE message with the ChatRoom representation.
Documentation
¶
Overview ¶
Package websocket starts a WebSocket service forwarding internal events to http clients
Index ¶
- Constants
- func ClearSession(session *melody.Session)
- func Marshal(m Message) []byte
- func NewErrorMessage(e error) []byte
- func NewErrorMessageString(e string) []byte
- func UpdateSessionFromClaims(session *melody.Session, claims claim.Claims, pool *views.ClientsPool)
- type ChatHandler
- func (c *ChatHandler) AppendUserToRoom(room *chat.ChatRoom, userName string)
- func (c *ChatHandler) BroadcastChatMessage(ctx context.Context, msg *chat.ChatEvent) error
- func (c *ChatHandler) FindOrCreateRoom(ctx context.Context, room *chat.ChatRoom, createIfNotExists bool) (*chat.ChatRoom, error)
- func (c *ChatHandler) InitHandlers(serviceCtx context.Context)
- func (c *ChatHandler) RemoveUserFromRoom(room *chat.ChatRoom, userName string)
- type ChatMessage
- type ChatMessageType
- type Message
- type MessageType
- type NodeChangeEventWithInfo
- type NodeEventsBatcher
- type WebsocketHandler
- func (w *WebsocketHandler) BroadcastActivityEvent(ctx context.Context, event *activity.PostActivityEvent) error
- func (w *WebsocketHandler) BroadcastIDMChangeEvent(ctx context.Context, event *idm.ChangeEvent) error
- func (w *WebsocketHandler) BroadcastNodeChangeEvent(ctx context.Context, event *NodeChangeEventWithInfo) error
- func (w *WebsocketHandler) BroadcastTaskChangeEvent(ctx context.Context, event *jobs.TaskChangeEvent) error
- func (w *WebsocketHandler) HandleNodeChangeEvent(ctx context.Context, event *tree.NodeChangeEvent) error
- func (w *WebsocketHandler) InitHandlers(serviceCtx context.Context)
Constants ¶
const LimiterBurst = 20
const LimiterRate = 30
const SessionClaimsKey = "profile"
const SessionLimiterKey = "limiter"
const SessionProfileKey = "profile"
const SessionRolesKey = "roles"
const (
SessionRoomKey = "room"
)
const SessionUsernameKey = "user"
const SessionWorkspacesKey = "workspaces"
Variables ¶
This section is empty.
Functions ¶
func ClearSession ¶
func NewErrorMessage ¶
func NewErrorMessageString ¶
func UpdateSessionFromClaims ¶
Types ¶
type ChatHandler ¶
type ChatHandler struct { Websocket *melody.Melody Pool *views.ClientsPool }
func NewChatHandler ¶
func NewChatHandler(serviceCtx context.Context) *ChatHandler
func (*ChatHandler) AppendUserToRoom ¶
func (c *ChatHandler) AppendUserToRoom(room *chat.ChatRoom, userName string)
func (*ChatHandler) BroadcastChatMessage ¶
func (*ChatHandler) FindOrCreateRoom ¶
func (*ChatHandler) InitHandlers ¶
func (c *ChatHandler) InitHandlers(serviceCtx context.Context)
func (*ChatHandler) RemoveUserFromRoom ¶
func (c *ChatHandler) RemoveUserFromRoom(room *chat.ChatRoom, userName string)
type ChatMessage ¶
type ChatMessage struct { Type ChatMessageType `json:"@type"` RoomType chat.RoomType Payload string }
type ChatMessageType ¶
type ChatMessageType string
const ( JoinRoom ChatMessageType = "join" LeaveRoom ChatMessageType = "leave" PostMessage ChatMessageType = "msg" )
type Message ¶
type Message struct { Type MessageType `json:"@type"` JWT string `json:"jwt"` Error string `json:"error"` }
Should pass JWT instead of username
type MessageType ¶
type MessageType string
const ( MsgSubscribe MessageType = "subscribe" MsgUnsubscribe MessageType = "unsubscribe" MsgError MessageType = "error" )
type NodeChangeEventWithInfo ¶ added in v1.2.0
type NodeChangeEventWithInfo struct { tree.NodeChangeEvent // contains filtered or unexported fields }
type NodeEventsBatcher ¶ added in v1.2.0
type NodeEventsBatcher struct {
// contains filtered or unexported fields
}
NodeEventsBatcher buffers events with same node.uuid and flatten them into one where possible
func NewEventsBatcher ¶ added in v1.2.0
func NewEventsBatcher(timeout time.Duration, uuid string, out chan *NodeChangeEventWithInfo, done chan string) *NodeEventsBatcher
NewEventsBatcher creates a new NodeEventsBatcher
func (*NodeEventsBatcher) Flush ¶ added in v1.2.0
func (n *NodeEventsBatcher) Flush()
Flush applies the events buffered as one
type WebsocketHandler ¶
type WebsocketHandler struct { Websocket *melody.Melody EventRouter *views.RouterEventFilter // contains filtered or unexported fields }
func NewWebSocketHandler ¶
func NewWebSocketHandler(serviceCtx context.Context) *WebsocketHandler
func (*WebsocketHandler) BroadcastActivityEvent ¶
func (w *WebsocketHandler) BroadcastActivityEvent(ctx context.Context, event *activity.PostActivityEvent) error
BroadcastActivityEvent listens to activities and broadcast them to sessions with the adequate user.
func (*WebsocketHandler) BroadcastIDMChangeEvent ¶
func (w *WebsocketHandler) BroadcastIDMChangeEvent(ctx context.Context, event *idm.ChangeEvent) error
BroadcastIDMChangeEvent listens to ACL events and broadcast them to sessions if the Role, User, or Workspace is concerned This triggers a registry reload in the UX (and eventually a change of permissions)
func (*WebsocketHandler) BroadcastNodeChangeEvent ¶
func (w *WebsocketHandler) BroadcastNodeChangeEvent(ctx context.Context, event *NodeChangeEventWithInfo) error
BroadcastNodeChangeEvent will browse the currently registered websocket sessions and decide whether to broadcast the event or not.
func (*WebsocketHandler) BroadcastTaskChangeEvent ¶
func (w *WebsocketHandler) BroadcastTaskChangeEvent(ctx context.Context, event *jobs.TaskChangeEvent) error
BroadcastTaskChangeEvent listens to tasks events and broadcast them to sessions with the adequate user.
func (*WebsocketHandler) HandleNodeChangeEvent ¶ added in v1.2.0
func (w *WebsocketHandler) HandleNodeChangeEvent(ctx context.Context, event *tree.NodeChangeEvent) error
HandleNodeChangeEvent listens to NodeChangeEvents and either broadcast them directly, or use NodeEventsBatcher to buffer them and flatten them into one.
func (*WebsocketHandler) InitHandlers ¶
func (w *WebsocketHandler) InitHandlers(serviceCtx context.Context)