websocket

package
v4.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 17, 2023 License: AGPL-3.0 Imports: 38 Imported by: 0

README

WebSocket Service

This service provides an Http server supporting WebSocket protocol to send realtime events to user's interfaces. It is hooked to various internal Topics and contains most of the logic to decide whether to broadcast a message to a given connected user.

Service is listening by default on the [::]:5050/ws and [::]:5050/chat (see below) endpoints.

Currently only the ReactJS interface is using this connection to update in real-time.

Internal Events to Websocket Sessions

Sessions

Websocket server is implemented internaly using GIN and Melody libraries. WS connections are identified as "Sessions", and when a client is connecting, it must send a first "register:" message containing the current JWT token, as it would for any other public API in the micro-services architecture. When received, the JWT is parsed and the current user workspaces are stored in the session.

When an internal event arrives on the micro event bus, the websocket logic will map the various active sessions and generally use their username or list of workspaces to broadcast the event to each of them (or not).

Wired Events
  • common.TOPIC_TREE_CHANGES : Sends tree events to dynamically update files and folders in the interface
  • common.TOPIC_META_CHANGES : Same as TREE_CHANGES
  • common.TOPIC_JOB_TASK_EVENT : Send Task events to show tasks progression
  • common.TOPIC_IDM_EVENT : Identity Management are sent to user to trigger a reload of their roles and ACL's
  • common.TOPIC_ACTIVITY_EVENT : Activities events will refresh events feeds and alerts

Chat Handler

A dedicated handler is listening on [::]:5050/chat and is specifically plugged to the internal CHAT topic to dynamically create rooms, connect users to them, etc. It provides a gateway for all chat operations, using the following protocol :

  • Connection to websocket : as for the common ws, send a "subscribe" message with the JWT token
  • Connection to a chat room: JOIN message, and a ChatRoom json representation (including chat type and chat Uuid. It will create the room if not already existing, and send back all the previous messages for this room
  • Post a message : POST message, with ChatMessage and ChatRoom
  • Disconnect from a room: LEAVE message with the ChatRoom representation.

Documentation

Overview

Package websocket starts a WebSocket service forwarding internal events to http clients

Index

Constants

View Source
const (
	SessionRolesKey              = "roles"
	SessionWorkspacesKey         = "workspaces"
	SessionSettingsWorkspacesKey = "settings"
	SessionAccessListKey         = "accessList"
	SessionUsernameKey           = "user"
	SessionProfileKey            = "profile"
	SessionClaimsKey             = "claims"
	SessionSubjectsKey           = "subjects"
	SessionLimiterKey            = "limiter"
	SessionMetaContext           = "metaContext"
)
View Source
const LimiterBurst = 20
View Source
const LimiterRate = 30
View Source
const (
	SessionRoomKey = "room"
)

Variables

This section is empty.

Functions

func ClearSession

func ClearSession(session *melody.Session)

func Marshal

func Marshal(m Message) []byte

func NewErrorMessage

func NewErrorMessage(e error) []byte

func NewErrorMessageString

func NewErrorMessageString(e string) []byte

Types

type ChatHandler

type ChatHandler struct {
	Websocket *melody.Melody
	Pool      nodes.SourcesPool
	// contains filtered or unexported fields
}

func NewChatHandler

func NewChatHandler(ctx context.Context) *ChatHandler

NewChatHandler creates a new ChatHandler

func (*ChatHandler) BroadcastChatMessage

func (c *ChatHandler) BroadcastChatMessage(ctx context.Context, msg *chat.ChatEvent) error

BroadcastChatMessage sends chat message to connected sessions

type ChatMessage

type ChatMessage struct {
	Type     ChatMessageType `json:"@type"`
	RoomType chat.RoomType
	Payload  string
}

type ChatMessageType

type ChatMessageType string
const (
	JoinRoom    ChatMessageType = "join"
	LeaveRoom   ChatMessageType = "leave"
	PostMessage ChatMessageType = "msg"
)

type Message

type Message struct {
	Type  MessageType `json:"@type"`
	JWT   string      `json:"jwt"`
	Error string      `json:"error"`
}

Message passes JWT

type MessageType

type MessageType string
const (
	MsgSubscribe   MessageType = "subscribe"
	MsgUnsubscribe MessageType = "unsubscribe"
	MsgError       MessageType = "error"
)

type NodeChangeEventWithInfo

type NodeChangeEventWithInfo struct {
	tree.NodeChangeEvent
	// contains filtered or unexported fields
}

type NodeEventsBatcher

type NodeEventsBatcher struct {
	// contains filtered or unexported fields
}

NodeEventsBatcher buffers events with same node.uuid and flatten them into one where possible

func NewEventsBatcher

func NewEventsBatcher(timeout time.Duration, uuid string, out chan *NodeChangeEventWithInfo, done chan string) *NodeEventsBatcher

NewEventsBatcher creates a new NodeEventsBatcher

func (*NodeEventsBatcher) EnqueueWithRecover added in v4.0.1

func (n *NodeEventsBatcher) EnqueueWithRecover(ev *NodeChangeEventWithInfo) (err error)

func (*NodeEventsBatcher) Flush

func (n *NodeEventsBatcher) Flush()

Flush applies the events buffered as one

type WebsocketHandler

type WebsocketHandler struct {
	Websocket   *melody.Melody
	EventRouter *compose.Reverse
	// contains filtered or unexported fields
}

func NewWebSocketHandler

func NewWebSocketHandler(serviceCtx context.Context) *WebsocketHandler

func (*WebsocketHandler) BroadcastActivityEvent

func (w *WebsocketHandler) BroadcastActivityEvent(ctx context.Context, event *activity.PostActivityEvent) error

BroadcastActivityEvent listens to activities and broadcast them to sessions with the adequate user.

func (*WebsocketHandler) BroadcastIDMChangeEvent

func (w *WebsocketHandler) BroadcastIDMChangeEvent(ctx context.Context, event *idm.ChangeEvent) error

BroadcastIDMChangeEvent listens to ACL events and broadcast them to sessions if the Role, User, or Workspace is concerned This triggers a registry reload in the UX (and eventually a change of permissions)

func (*WebsocketHandler) BroadcastNodeChangeEvent

func (w *WebsocketHandler) BroadcastNodeChangeEvent(ctx context.Context, event *NodeChangeEventWithInfo) error

BroadcastNodeChangeEvent will browse the currently registered websocket sessions and decide whether to broadcast the event or not.

func (*WebsocketHandler) BroadcastTaskChangeEvent

func (w *WebsocketHandler) BroadcastTaskChangeEvent(ctx context.Context, event *jobs.TaskChangeEvent) error

BroadcastTaskChangeEvent listens to tasks events and broadcast them to sessions with the adequate user.

func (*WebsocketHandler) HandleNodeChangeEvent

func (w *WebsocketHandler) HandleNodeChangeEvent(ctx context.Context, event *tree.NodeChangeEvent) error

HandleNodeChangeEvent listens to NodeChangeEvents and either broadcast them directly, or use NodeEventsBatcher to buffer them and flatten them into one.

func (*WebsocketHandler) InitHandlers

func (w *WebsocketHandler) InitHandlers(ctx context.Context)

func (*WebsocketHandler) MatchPolicies

func (w *WebsocketHandler) MatchPolicies(ctx context.Context, session *melody.Session, policies []*service.ResourcePolicy, action service.ResourcePolicyAction) bool

MatchPolicies creates a memory-based policy stack checker to check if action is allowed or denied. It uses a DenyByDefault strategy

Directories

Path Synopsis
Package api starts the actual WebSocket service
Package api starts the actual WebSocket service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL