websocket

package
v2.1.0+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2020 License: AGPL-3.0 Imports: 25 Imported by: 9

README

WebSocket Service

This service provides an Http server supporting WebSocket protocol to send realtime events to user's interfaces. It is hooked to various internal Topics and contains most of the logic to decide whether to broadcast a message to a given connected user.

Service is listening by default on the [::]:5050/ws and [::]:5050/chat (see below) endpoints.

Currently only the ReactJS interface is using this connection to update in real-time.

Internal Events to Websocket Sessions

Sessions

Websocket server is implemented internaly using GIN and Melody libraries. WS connections are identified as "Sessions", and when a client is connecting, it must send a first "register:" message containing the current JWT token, as it would for any other public API in the micro-services architecture. When received, the JWT is parsed and the current user workspaces are stored in the session.

When an internal event arrives on the micro event bus, the websocket logic will map the various active sessions and generally use their username or list of workspaces to broadcast the event to each of them (or not).

Wired Events
  • common.TOPIC_TREE_CHANGES : Sends tree events to dynamically update files and folders in the interface
  • common.TOPIC_META_CHANGES : Same as TREE_CHANGES
  • common.TOPIC_JOB_TASK_EVENT : Send Task events to show tasks progression
  • common.TOPIC_IDM_EVENT : Identity Management are sent to user to trigger a reload of their roles and ACL's
  • common.TOPIC_ACTIVITY_EVENT : Activities events will refresh events feeds and alerts

Chat Handler

A dedicated handler is listening on [::]:5050/chat and is specifically plugged to the internal CHAT topic to dynamically create rooms, connect users to them, etc. It provides a gateway for all chat operations, using the following protocol :

  • Connection to websocket : as for the common ws, send a "subscribe" message with the JWT token
  • Connection to a chat room: JOIN message, and a ChatRoom json representation (including chat type and chat Uuid. It will create the room if not already existing, and send back all the previous messages for this room
  • Post a message : POST message, with ChatMessage and ChatRoom
  • Disconnect from a room: LEAVE message with the ChatRoom representation.

Documentation

Overview

Package websocket starts a WebSocket service forwarding internal events to http clients

Index

Constants

View Source
const LimiterBurst = 20
View Source
const LimiterRate = 30
View Source
const SessionAccessListKey = "accessList"
View Source
const SessionClaimsKey = "claims"
View Source
const SessionLimiterKey = "limiter"
View Source
const SessionProfileKey = "profile"
View Source
const SessionRolesKey = "roles"
View Source
const (
	SessionRoomKey = "room"
)
View Source
const SessionUsernameKey = "user"
View Source
const SessionWorkspacesKey = "workspaces"

Variables

This section is empty.

Functions

func ClearSession

func ClearSession(session *melody.Session)

func Marshal

func Marshal(m Message) []byte

func NewErrorMessage

func NewErrorMessage(e error) []byte

func NewErrorMessageString

func NewErrorMessageString(e string) []byte

func UpdateSessionFromClaims

func UpdateSessionFromClaims(session *melody.Session, claims claim.Claims, pool *views.ClientsPool)

Types

type ChatHandler

type ChatHandler struct {
	Websocket *melody.Melody
	Pool      *views.ClientsPool
}

func NewChatHandler

func NewChatHandler(serviceCtx context.Context) *ChatHandler

func (*ChatHandler) AppendUserToRoom

func (c *ChatHandler) AppendUserToRoom(room *chat.ChatRoom, userName string)

func (*ChatHandler) BroadcastChatMessage

func (c *ChatHandler) BroadcastChatMessage(ctx context.Context, msg *chat.ChatEvent) error

func (*ChatHandler) FindOrCreateRoom

func (c *ChatHandler) FindOrCreateRoom(ctx context.Context, room *chat.ChatRoom, createIfNotExists bool) (*chat.ChatRoom, error)

func (*ChatHandler) InitHandlers

func (c *ChatHandler) InitHandlers(serviceCtx context.Context)

func (*ChatHandler) RemoveUserFromRoom

func (c *ChatHandler) RemoveUserFromRoom(room *chat.ChatRoom, userName string)

type ChatMessage

type ChatMessage struct {
	Type     ChatMessageType `json:"@type"`
	RoomType chat.RoomType
	Payload  string
}

type ChatMessageType

type ChatMessageType string
const (
	JoinRoom    ChatMessageType = "join"
	LeaveRoom   ChatMessageType = "leave"
	PostMessage ChatMessageType = "msg"
)

type Message

type Message struct {
	Type  MessageType `json:"@type"`
	JWT   string      `json:"jwt"`
	Error string      `json:"error"`
}

Should pass JWT instead of username

type MessageType

type MessageType string
const (
	MsgSubscribe   MessageType = "subscribe"
	MsgUnsubscribe MessageType = "unsubscribe"
	MsgError       MessageType = "error"
)

type NodeChangeEventWithInfo added in v1.2.0

type NodeChangeEventWithInfo struct {
	tree.NodeChangeEvent
	// contains filtered or unexported fields
}

type NodeEventsBatcher added in v1.2.0

type NodeEventsBatcher struct {
	// contains filtered or unexported fields
}

NodeEventsBatcher buffers events with same node.uuid and flatten them into one where possible

func NewEventsBatcher added in v1.2.0

func NewEventsBatcher(timeout time.Duration, uuid string, out chan *NodeChangeEventWithInfo, done chan string) *NodeEventsBatcher

NewEventsBatcher creates a new NodeEventsBatcher

func (*NodeEventsBatcher) Flush added in v1.2.0

func (n *NodeEventsBatcher) Flush()

Flush applies the events buffered as one

type WebsocketHandler

type WebsocketHandler struct {
	Websocket   *melody.Melody
	EventRouter *views.RouterEventFilter
	// contains filtered or unexported fields
}

func NewWebSocketHandler

func NewWebSocketHandler(serviceCtx context.Context) *WebsocketHandler

func (*WebsocketHandler) BroadcastActivityEvent

func (w *WebsocketHandler) BroadcastActivityEvent(ctx context.Context, event *activity.PostActivityEvent) error

BroadcastActivityEvent listens to activities and broadcast them to sessions with the adequate user.

func (*WebsocketHandler) BroadcastIDMChangeEvent

func (w *WebsocketHandler) BroadcastIDMChangeEvent(ctx context.Context, event *idm.ChangeEvent) error

BroadcastIDMChangeEvent listens to ACL events and broadcast them to sessions if the Role, User, or Workspace is concerned This triggers a registry reload in the UX (and eventually a change of permissions)

func (*WebsocketHandler) BroadcastNodeChangeEvent

func (w *WebsocketHandler) BroadcastNodeChangeEvent(ctx context.Context, event *NodeChangeEventWithInfo) error

BroadcastNodeChangeEvent will browse the currently registered websocket sessions and decide whether to broadcast the event or not.

func (*WebsocketHandler) BroadcastTaskChangeEvent

func (w *WebsocketHandler) BroadcastTaskChangeEvent(ctx context.Context, event *jobs.TaskChangeEvent) error

BroadcastTaskChangeEvent listens to tasks events and broadcast them to sessions with the adequate user.

func (*WebsocketHandler) HandleNodeChangeEvent added in v1.2.0

func (w *WebsocketHandler) HandleNodeChangeEvent(ctx context.Context, event *tree.NodeChangeEvent) error

HandleNodeChangeEvent listens to NodeChangeEvents and either broadcast them directly, or use NodeEventsBatcher to buffer them and flatten them into one.

func (*WebsocketHandler) InitHandlers

func (w *WebsocketHandler) InitHandlers(serviceCtx context.Context)

Directories

Path Synopsis
Package api starts the actual WebSocket service
Package api starts the actual WebSocket service

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL