server

package
v0.0.0-...-9d4eb55 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 3, 2024 License: GPL-3.0 Imports: 54 Imported by: 0

Documentation

Overview

*****************************************************************************

  • Description :
  • Topic in a cluster which serves as a local representation of the master
  • topic hosted at another node. ****************************************************************************

Index

Constants

View Source
const (
	// StopNone no reason given/default.
	StopNone = iota
	// StopShutdown terminated due to system shutdown.
	StopShutdown
	// StopDeleted terminated due to being deleted.
	StopDeleted
	// StopRehashing terminated due to cluster rehashing (moved to a different node).
	StopRehashing
)

Reasons why topic is being shut down.

Variables

This section is empty.

Functions

func Init

func Init(sa ServerArgs)

Init() inits global config and logger, utils and ...

func InitUsersGarbageCollection

func InitUsersGarbageCollection(period time.Duration, blockSize, minAccountAgeHours int) chan<- bool

InitUsersGarbageCollection() runs every 'period' and deletes up to 'blockSize'

stale un-validated user accounts which have been last updated at least 'minAccountAgeHours' hours.

Returns channel which can be used to stop the process.

func NewCluster

func NewCluster(ca ClusterArgs) (int, error)

NewCluster() returns snowflake worker id (pass nil if you don't want to use cluster)

Cluster won't be started here yet.

Types

type ClientComMessage

type ClientComMessage struct {
	Hi    *MsgClientHi    `json:"hi"`
	Acc   *MsgClientAcc   `json:"acc"`
	Login *MsgClientLogin `json:"login"`
	Sub   *MsgClientSub   `json:"sub"`
	Leave *MsgClientLeave `json:"leave"`
	Pub   *MsgClientPub   `json:"pub"`
	Get   *MsgClientGet   `json:"get"`
	Set   *MsgClientSet   `json:"set"`
	Del   *MsgClientDel   `json:"del"`
	Note  *MsgClientNote  `json:"note"`
	// Optional data.
	Extra *MsgClientExtra `json:"extra"`

	// Message Id de-normalized
	Id string `json:"-"`
	// Un-routable (original) topic name de-normalized from XXX.Topic.
	Original string `json:"-"`
	// Routable (expanded) topic name.
	RcptTo string `json:"-"`
	// Sender's UserId as string.
	AsUser string `json:"-"`
	// Sender's authentication level.
	AuthLvl int `json:"-"`
	// De-normalized 'what' field of meta messages (set, get, del).
	MetaWhat int `json:"-"`
	// Timestamp when this message was received by the server.
	Timestamp time.Time `json:"-"`
	// contains filtered or unexported fields
}

ClientComMessage is a wrapper for client messages.

type Cluster

type Cluster struct {
	// contains filtered or unexported fields
}

Cluster is the representation of the cluster.

func (*Cluster) Health

func (c *Cluster) Health(health *ClusterHealth, unused *bool) error

Health is called by the leader node to assert leadership and check status of the followers.

func (*Cluster) Ping

func (c *Cluster) Ping(ping *ClusterPing, unused *bool) error

Ping is a gRPC endpoint which receives ping requests from peer nodes.Used to detect node restarts.

func (*Cluster) Route

func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error

Route endpoint receives intra-cluster messages destined for the nodes hosting the topic. Called by Hub.route channel consumer for messages send without attaching to topic first.

func (*Cluster) TopicMaster

func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error

TopicMaster is a gRPC endpoint which receives requests sent by proxy topic to master topic.

func (Cluster) TopicProxy

func (Cluster) TopicProxy(msg *ClusterResp, unused *bool) error

TopicProxy is a gRPC endpoint at topic proxy which receives topic master responses.

func (*Cluster) UserCacheUpdate

func (c *Cluster) UserCacheUpdate(msg *UserCacheReq, rejected *bool) error

UserCacheUpdate endpoint receives updates to user's cached values as well as sends push notifications.

func (*Cluster) Vote

func (c *Cluster) Vote(vreq *ClusterVoteRequest, response *ClusterVoteResponse) error

Vote processes request for a vote from a candidate.

type ClusterArgs

type ClusterArgs struct {
	Cfg    *config.ClusterConfig
	Logger *logger.Logger
	Stats  *stats.Stats
}

type ClusterFailover

type ClusterFailover struct {
	// contains filtered or unexported fields
}

Failover config.

type ClusterHealth

type ClusterHealth struct {
	// Name of the leader node
	Leader string
	// Election term
	Term int
	// Ring hash signature that represents the cluster
	Signature string
	// Names of nodes currently active in the cluster
	Nodes []string
}

ClusterHealth is content of a leader's health check of a follower node.

type ClusterNode

type ClusterNode struct {
	// contains filtered or unexported fields
}

ClusterNode is a client's connection to another node.

type ClusterPing

type ClusterPing struct {
	// Name of the node sending this request.
	Node string

	// Fingerprint of the node sending this request.
	// Fingerprint changes when the node restarts.
	Fingerprint int64
}

ClusterPing is used to detect node restarts.

type ClusterReq

type ClusterReq struct {
	// Name of the Node sending this request
	Node string
	// Ring hash Signature of the node sending this request Signature must match
	// the Signature of the receiver, otherwise the Cluster is desynchronized.
	Signature string
	// Fingerprint of the node sending this request.
	// Fingerprint changes when the node is restarted.
	Fingerprint int64

	// Type of request.
	ReqType ProxyReqType

	// Client message. Set for C2S requests.
	CliMsg *ClientComMessage
	// Message to be routed. Set for intra-cluster route requests.
	SrvMsg *ServerComMessage

	// Expanded (routable) topic name
	RcptTo string
	// Originating session
	Sess *ClusterSess
	// True when the topic proxy is Gone.
	Gone bool
}

ClusterReq is either a Proxy to Master or Topic Proxy to Topic Master or intra-cluster routing request message.

type ClusterResp

type ClusterResp struct {
	// Server message with the response.
	SrvMsg *ServerComMessage
	// Originating session ID to forward response to, if any.
	OrigSid string
	// Expanded (routable) topic name
	RcptTo string

	// Original request type.
	OrigReqType ProxyReqType
}

ClusterResp is a Master to Proxy response message.

type ClusterRoute

type ClusterRoute struct {
	// Name of the node sending this request
	Node string

	// Ring hash signature of the node sending this request
	// Signature must match the signature of the receiver, otherwise the
	// Cluster is desynchronized.
	Signature string

	// Fingerprint of the node sending this request.
	// Fingerprint changes when the node is restarted.
	Fingerprint int64

	// Message to be routed. Set for intra-cluster route requests.
	SrvMsg *ServerComMessage

	// Originating session
	Sess *ClusterSess
}

ClusterRoute is intra-cluster routing request message.

type ClusterSess

type ClusterSess struct {
	// IP address of the client. For long polling this is the IP of the last poll
	RemoteAddr string

	// User agent, a string provided by an authenticated client in {login} packet
	UserAgent string

	// ID of the current user or 0
	Uid types.Uid

	// User's authentication level
	AuthLvl at.Level

	// Protocol version of the client: ((major & 0xff) << 8) | (minor & 0xff)
	Ver int

	// Human language of the client
	Lang string
	// Country of the client
	CountryCode string

	// Device ID
	DeviceID string

	// Device Platform: "web", "ios", "android"
	Platform string

	// Session ID
	Sid string

	// Background session
	Background bool
}

ClusterSess is a basic info on a remote session where the message was created.

type ClusterSessUpdate

type ClusterSessUpdate struct {
	// User this session represents.
	Uid types.Uid
	// Session id.
	Sid string
	// Session user agent.
	UserAgent string
}

ClusterSessUpdate represents a request to update a session. User Agent change or background session comes to foreground.

type ClusterVote

type ClusterVote struct {
	// contains filtered or unexported fields
}

ClusterVote is a vote request and a response in leader election.

type ClusterVoteRequest

type ClusterVoteRequest struct {
	// Candidate node which issued this request
	Node string
	// Election term
	Term int
}

ClusterVoteRequest is a request from a leader candidate to a node to vote for the candidate.

type ClusterVoteResponse

type ClusterVoteResponse struct {
	// Actual vote
	Result bool
	// Node's term after the vote
	Term int
}

ClusterVoteResponse is a vote from a node.

type CredValidator

type CredValidator struct {
	// AuthLevel(s) which require this validator.
	RequiredAuthLvl []types.Level
	AddToTags       bool
}

CredValidator holds additional config params for a credential validator.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub is the core structure which holds topics.

type MsgAccessMode

type MsgAccessMode struct {
	// Access mode requested by the user
	Want string
	// Access mode granted to the user by the admin
	Given string
	// Cumulative access mode want & given
	Mode string
}

MsgAccessMode is a definition of access mode.

type MsgClientAcc

type MsgClientAcc struct {
	// message id
	Id string
	// "newXYZ" to create a new user or UserId to update a user; default: current user.
	User string
	// Temporary authentication parameters for one-off actions, like password reset.
	TmpScheme string
	TmpSecret []byte
	// Account state: normal, suspended.
	State string
	// Authentication level of the user when UserID is set and not equal to the current user.
	// Either "", "auth" or "anon". Default: ""
	AuthLevel string
	// The initial authentication scheme the account can use
	Scheme string
	// Shared secret
	Secret []byte
	// Authenticate session with the newly created account
	Login bool
	// Indexable tags for user discovery
	Tags []string
	// User initialization data when creating a new user, otherwise ignored
	Desc *MsgSetDesc
	// Credentials to verify (email or phone or captcha)
	Cred []MsgCredClient
}

MsgClientAcc is an {acc} message for creating or updating a user account.

type MsgClientDel

type MsgClientDel struct {
	Id    string `json:"id,omitempty"`
	Topic string `json:"topic,omitempty"`
	// What to delete:
	// * "msg" to delete messages (default)
	// * "topic" to delete the topic
	// * "sub" to delete a subscription to topic.
	// * "user" to delete or disable user.
	// * "cred" to delete credential (email or phone)
	What string `json:"what"`
	// Delete messages with these IDs (either one by one or a set of ranges)
	DelSeq []MsgDelRange `json:"del_seq"`
	// User ID of the user or subscription to delete
	User string `json:"user"`
	// Credential to delete
	Cred *MsgCredClient `json:"cred"`
	// Request to hard-delete objects (i.e. delete messages for all users), if such option is available.
	Hard bool `json:"hard"`
}

MsgClientDel delete messages or topic {del}.

type MsgClientExtra

type MsgClientExtra struct {
	// Array of out-of-band attachments which have to be exempted from GC.
	Attachments []string `json:"attachments,omitempty"`
	// Alternative user ID set by the root user (obo = On Behalf Of).
	AsUser string `json:"obo,omitempty"`
	// Altered authentication level set by the root user.
	AuthLevel string `json:"authlevel,omitempty"`
}

MsgClientExtra is not a stand-alone message but extra data which augments the main payload.

type MsgClientGet

type MsgClientGet struct {
	Id    string `json:"id,omitempty"`
	Topic string `json:"topic"`
	MsgGetQuery
}

MsgClientGet is a query of topic state {get}.

type MsgClientHi

type MsgClientHi struct {
	Id        string `json:"id"`
	UserAgent string `json:"user_agent"`
	Version   string `json:"version"`   // Protocol version, i.e. "0.13"
	DeviceID  string `json:"device_id"` // Client's unique device ID
	Lang      string `json:"lang"`      // ISO 639-1 human language of the connected device
	Platform  string `json:"platform"`  // Platform code: ios, android, web.
	// Session is initially in non-iteractive, i.e. issued by a service. Presence notifications are delayed.
	Background bool `json:"background"`
}

MsgClientHi is a handshake {hi} message

type MsgClientLeave

type MsgClientLeave struct {
	Id    string `json:"id"`
	Topic string `json:"topic"`
	Unsub bool   `json:"unsub"`
}

MsgClientLeave is an unsubscribe {leave} request message.

type MsgClientLogin

type MsgClientLogin struct {
	// message id
	Id string
	// Authentication scheme
	Scheme string
	// Shared secret
	Secret []byte
	// Credntials being verified (email or phone or captcha etc.)
	Cred []MsgCredClient
}

MsgClientLogin is a login {login} message.

type MsgClientNote

type MsgClientNote struct {
	// There is no Id -- server will not akn {ping} packets, they are "fire and forget"
	Topic string `json:"topic"`
	// what is being reported: "recv" - message received, "read" - message read, "kp" - typing notification
	What string `json:"what"`
	// Server-issued message ID being reported
	SeqId int `json:"seq,omitempty"`
	// Client's count of unread messages to report back to the server. Used in push notifications on iOS.
	Unread int `json:"unread,omitempty"`
	// Call event.
	Event string `json:"event,omitempty"`
	// Arbitrary json payload (used in video calls).
	Payload json.RawMessage `json:"payload,omitempty"`
}

MsgClientNote is a client-generated notification for topic subscribers {note}.

type MsgClientPub

type MsgClientPub struct {
	Id      string         `json:"id"`
	Topic   string         `json:"topic"`
	NoEcho  bool           `json:"no_echo"`
	Head    map[string]any `json:"head"`
	Content any            `json:"content"`
}

MsgClientPub is client's request to publish data to topic subscribers {pub}.

type MsgClientSet

type MsgClientSet struct {
	Id    string `json:"id,omitempty"`
	Topic string `json:"topic"`
	MsgSetQuery
}

MsgClientSet is an update of topic state {set}.

type MsgClientSub

type MsgClientSub struct {
	Id    string `json:"id"`
	Topic string `json:"topic"`

	// Mirrors {set}.
	Set *MsgSetQuery `json:"set"`
	// Mirrors {get}.
	Get *MsgGetQuery `json:"get"`

	// True if this subscription created a new topic. In case of p2p topics, it's true if the other
	// user's subscription was created (as a part of new topic creation or just alone).
	Created bool `json:"-"`
	// True if this is a new subscription.
	Newsub bool `json:"-"`
}

MsgClientSub is a subscription request {sub} message.

type MsgCredClient

type MsgCredClient struct {
	// Credential type, i.e. `email` or `tel`.
	Method string
	// Value to verify, i.e. `user@example.com` or `+18003287448`
	Value string
	// Verification response
	Response string
	// Request parameters, such as preferences. Passed to valiator without interpretation.
	Params map[string]any
}

MsgCredClient is an account credential such as email or phone number.

type MsgCredServer

type MsgCredServer struct {
	// Credential type, i.e. `email` or `tel`.
	Method string `json:"meth,omitempty"`
	// Credential value, i.e. `user@example.com` or `+18003287448`
	Value string `json:"val,omitempty"`
	// Indicates that the credential is validated.
	Done bool `json:"done,omitempty"`
}

MsgCredServer is an account credential such as email or phone number.

type MsgDefaultAcsMode

type MsgDefaultAcsMode struct {
	Auth string `mapstructure:"auth"`
	Anon string `mapstructure:"anon"`
}

MsgDefaultAcsMode is a C2S in set.what == "desc", acc, sub message.

type MsgDelRange

type MsgDelRange struct {
	LowId int `json:"low,omitempty"`
	HiId  int `json:"hi,omitempty"`
}

MsgDelRange is either an individual ID (HiID=0) or a randge of deleted IDs,

low end inclusive (closed), high-end exclusive (open): [LowId .. HiId), e.g. 1..5 -> 1, 2, 3, 4.

type MsgDelValues

type MsgDelValues struct {
	DelId  int           `json:"clear,omitempty"`
	DelSeq []MsgDelRange `json:"delseq,omitempty"`
}

MsgDelValues describes request to delete messages.

type MsgGetOpts

type MsgGetOpts struct {
	// Optional User ID to return result(s) for one user.
	User string `json:"user"`
	// Optional topic name to return result(s) for one topic.
	Topic string `json:"topic"`
	// Return results modified since this timespamp.
	IfModifiedSince *time.Time `json:"if_modified_since"`
	// Load messages/ranges with IDs equal or greater than this (inclusive or closed)
	SinceId int `json:"since_id"`
	// Load messages/ranges with IDs lower than this (exclusive or open)
	BeforeId int `json:"before_id"`
	// Limit the number of messages loaded
	Limit int `json:"limit"`
}

MsgGetOpts defines Get query parameters.

type MsgGetQuery

type MsgGetQuery struct {
	What string
	// Parameters of "desc" request: IfModifiedSince
	Desc *MsgGetOpts `json:"desc,omitempty"`
	// Parameters of "sub" request: User, Topic, IfModifiedSince, Limit.
	Sub *MsgGetOpts `json:"sub,omitempty"`
	// Parameters of "data" request: Since, Before, Limit.
	Data *MsgGetOpts `json:"data,omitempty"`
	// Parameters of "del" request: Since, Before, Limit.
	Del *MsgGetOpts `json:"del,omitempty"`
}

MsgGetQuery is a topic metadata or data query.

type MsgLastSeenInfo

type MsgLastSeenInfo struct {
	// Timestamp of user's last appearance online.
	When *time.Time `json:"when,omitempty"`
	// User agent of the device when the user was last online.
	UserAgent string `json:"ua,omitempty"`
}

MsgLastSeenInfo contains info on user's appearance online - when & user agent.

type MsgServerCtrl

type MsgServerCtrl struct {
	Id        string    `json:"id,omitempty"`
	Topic     string    `json:"topic,omitempty"`
	Params    any       `json:"params,omitempty"`
	Code      int       `json:"code"`
	Text      string    `json:"text,omitempty"`
	Timestamp time.Time `json:"ts"`
}

MsgServerCtrl is a server control message {ctrl}.

type MsgServerData

type MsgServerData struct {
	Topic string `json:"topic"`
	// ID of the user who originated the message as {pub}, could be empty if sent by the system
	From      string         `json:"from,omitempty"`
	Timestamp time.Time      `json:"ts"`
	DeletedAt *time.Time     `json:"deleted,omitempty"`
	SeqId     int            `json:"seq"`
	Head      map[string]any `json:"head,omitempty"`
	Content   any            `json:"content"`
}

MsgServerData is a server {data} message.

type MsgServerInfo

type MsgServerInfo struct {
	// Topic to send event to.
	Topic string `json:"topic"`
	// Topic where the event has occurred (set only when Topic='me').
	Src string `json:"src,omitempty"`
	// ID of the user who originated the message.
	From string `json:"from,omitempty"`
	// The event being reported: "rcpt" - message received, "read" - message read, "kp" - typing notification, "call" - video call.
	What string `json:"what"`
	// Server-issued message ID being reported.
	SeqId int `json:"seq,omitempty"`
	// Call event.
	Event string `json:"event,omitempty"`
	// Arbitrary json payload (used by video calls).
	Payload json.RawMessage `json:"payload,omitempty"`

	// When sending to 'me', skip sessions subscribed to this topic.
	SkipTopic string `json:"-"`
}

MsgServerInfo is the server-side copy of MsgClientNote with From and optionally Src added (non-authoritative).

type MsgServerMeta

type MsgServerMeta struct {
	Id    string `json:"id,omitempty"`
	Topic string `json:"topic"`

	Timestamp *time.Time `json:"ts,omitempty"`

	// Topic description
	Desc *MsgTopicDesc `json:"desc,omitempty"`
	// Subscriptions as an array of objects
	Sub []MsgTopicSub `json:"sub,omitempty"`
	// Delete ID and the ranges of IDs of deleted messages
	Del *MsgDelValues `json:"del,omitempty"`
	// User discovery tags
	Tags []string `json:"tags,omitempty"`
	// Account credentials, 'me' only.
	Cred []*MsgCredServer `json:"cred,omitempty"`
}

MsgServerMeta is a topic metadata {meta} update.

type MsgServerPres

type MsgServerPres struct {
	Topic     string        `json:"topic"`
	Src       string        `json:"src,omitempty"`
	What      string        `json:"what"`
	UserAgent string        `json:"ua,omitempty"`
	SeqId     int           `json:"seq,omitempty"`
	DelId     int           `json:"clear,omitempty"`
	DelSeq    []MsgDelRange `json:"delseq,omitempty"`
	AcsTarget string        `json:"tgt,omitempty"`
	AcsActor  string        `json:"act,omitempty"`
	// Acs or a delta Acs. Need to marshal it to json under a name different than 'acs'
	// to allow different handling on the client
	Acs *MsgAccessMode `json:"dacs,omitempty"`

	// Flag to break the reply loop
	WantReply bool `json:"-"`

	// Additional access mode filters when sending to topic's online members. Both filter conditions must be true.
	// send only to those who have this access mode.
	FilterIn int `json:"-"`
	// skip those who have this access mode.
	FilterOut int `json:"-"`

	// When sending to 'me', skip sessions subscribed to this topic.
	SkipTopic string `json:"-"`

	// Send to sessions of a single user only.
	SingleUser string `json:"-"`

	// Exclude sessions of a single user.
	ExcludeUser string `json:"-"`
}

MsgServerPres is presence notification {pres} (authoritative update).

type MsgSetDesc

type MsgSetDesc struct {
	DefaultAcs *MsgDefaultAcsMode `json:"dfm"`     // default access mode
	Public     any                `json:"public"`  // description of the user or topic
	Trusted    any                `json:"trusted"` // trusted (system-provided) user or topic data
	Private    any                `json:"private"` // per-subscription private data
}

MsgSetDesc is a C2S in set.what == "desc", acc, sub message.

type MsgSetQuery

type MsgSetQuery struct {
	// Topic/user description, new object & new subscriptions only
	Desc *MsgSetDesc `json:"desc"`
	// Subscription parameters
	Sub *MsgSetSub `json:"sub"`
	// Indexable tags for user discovery
	Tags []string `json:"tags"`
	// Update to account credentials.
	Cred *MsgCredClient `json:"cred"`
}

MsgSetQuery is an update to topic or user metadata: description, subscriptions, tags, credentials.

type MsgSetSub

type MsgSetSub struct {
	// User affected by this request. Default (empty): current user
	User string `json:"user"`
	// Access mode change, either Given or Want depending on context
	Mode string `json:"mode"`
}

MsgSetSub is a payload in set.sub request to update current subscription or invite another user, {sub.what} == "sub".

type MsgTopicDesc

type MsgTopicDesc struct {
	CreatedAt *time.Time `json:"created,omitempty"`
	UpdatedAt *time.Time `json:"updated,omitempty"`
	// Timestamp of the last message
	TouchedAt *time.Time `json:"touched,omitempty"`

	// Account state, 'me' topic only.
	State string `json:"state,omitempty"`

	// If the group topic is online.
	Online bool `json:"online,omitempty"`

	// If the topic can be accessed as a channel
	IsChan bool `json:"chan,omitempty"`

	// P2P other user's last online timestamp & user agent
	LastSeen *MsgLastSeenInfo `json:"seen,omitempty"`

	DefaultAcs *MsgDefaultAcsMode `json:"defacs,omitempty"`
	// Actual access mode
	Acs *MsgAccessMode `json:"acs,omitempty"`
	// Max message ID
	SeqId     int `json:"seq,omitempty"`
	ReadSeqId int `json:"read,omitempty"`
	RecvSeqId int `json:"recv,omitempty"`
	// Id of the last delete operation as seen by the requesting user
	DelId   int `json:"clear,omitempty"`
	Public  any `json:"public,omitempty"`
	Trusted any `json:"trusted,omitempty"`
	// Per-subscription private data
	Private any `json:"private,omitempty"`
}

MsgTopicDesc is a topic description, S2C in Meta message.

type MsgTopicSub

type MsgTopicSub struct {

	// Timestamp when the subscription was last updated
	UpdatedAt *time.Time `json:"updated,omitempty"`
	// Timestamp when the subscription was deleted
	DeletedAt *time.Time `json:"deleted,omitempty"`

	// If the subscriber/topic is online
	Online bool `json:"online,omitempty"`

	// Access mode. Topic admins receive the full info, non-admins receive just the cumulative mode
	// Acs.Mode = want & given. The field is not a pointer because at least one value is always assigned.
	Acs MsgAccessMode `json:"acs,omitempty"`
	// ID of the message reported by the given user as read
	ReadSeqId int `json:"read,omitempty"`
	// ID of the message reported by the given user as received
	RecvSeqId int `json:"recv,omitempty"`
	// Topic's public data
	Public any `json:"public,omitempty"`
	// Topic's trusted public data
	Trusted any `json:"trusted,omitempty"`
	// User's own private data per topic
	Private any `json:"private,omitempty"`

	// Uid of the subscribed user
	User string `json:"user,omitempty"`

	// Topic name of this subscription
	Topic string `json:"topic,omitempty"`
	// Timestamp of the last message in the topic.
	TouchedAt *time.Time `json:"touched,omitempty"`
	// ID of the last {data} message in a topic
	SeqId int `json:"seq,omitempty"`
	// Id of the latest Delete operation
	DelId int `json:"clear,omitempty"`

	// Other user's last online timestamp & user agent
	LastSeen *MsgLastSeenInfo `json:"seen,omitempty"`
}

MsgTopicSub is topic subscription details, sent in Meta message.

type Plugin

type Plugin struct {
	// contains filtered or unexported fields
}

Plugin defines client-side parameters of a gRPC plugin.

type PluginFilter

type PluginFilter struct {
	// contains filtered or unexported fields
}

PluginFilter is a enum which defines filtering types.

func ParsePluginFilter

func ParsePluginFilter(s *string, filterBy int) (*PluginFilter, error)

ParsePluginFilter parses filter config string.

type ProxyReqType

type ProxyReqType int

ProxyReqType is the type of proxy requests.

const (
	ProxyReqNone      ProxyReqType = iota
	ProxyReqJoin                   // {sub}.
	ProxyReqLeave                  // {leave}
	ProxyReqMeta                   // {meta set|get}
	ProxyReqBroadcast              // {pub}, {note}
	ProxyReqBgSession
	ProxyReqMeUserAgent
	ProxyReqCall // Used in video call proxy sessions for routing call events.
)

Individual request types.

type Server

type Server struct{}

func NewServer

func NewServer(sa ServerArgs) *Server

func (*Server) InitVideoCalls

func (s *Server) InitVideoCalls(c *config.WebrtcConfig) error

type ServerArgs

type ServerArgs struct {
	Cfg   *config.Config
	Log   *logger.Logger
	Stats *stats.Stats
	Utils *utils.Utils

	ImmutableTagNS map[string]bool
	MaskedTagNS    map[string]bool
	// contains filtered or unexported fields
}

type ServerComMessage

type ServerComMessage struct {
	Ctrl *MsgServerCtrl `json:"ctrl,omitempty"`
	Data *MsgServerData `json:"data,omitempty"`
	Meta *MsgServerMeta `json:"meta,omitempty"`
	Pres *MsgServerPres `json:"pres,omitempty"`
	Info *MsgServerInfo `json:"info,omitempty"`

	// MsgServerData has no Id field, copying it here for use in {ctrl} aknowledgements
	Id string `json:"-"`
	// Routable (expanded) name of the topic.
	RcptTo string `json:"-"`
	// User ID of the sender of the original message.
	AsUser string `json:"-"`
	// Timestamp for consistency of timestamps in {ctrl} messages
	// (corresponds to originating client message receipt timestamp).
	Timestamp time.Time `json:"-"`

	// Session ID to skip when sendng packet to sessions. Used to skip sending to original session.
	// Could be either empty.
	SkipSid string `json:"-"`
	// contains filtered or unexported fields
}

ServerComMessage is a wrapper for server-side messages.

func ErrAPIKeyRequired

func ErrAPIKeyRequired(ts time.Time) *ServerComMessage

ErrAPIKeyRequired valid API key is required (403).

func ErrAlreadyAuthenticated

func ErrAlreadyAuthenticated(id, topic string, ts time.Time) *ServerComMessage

ErrAlreadyAuthenticated invalid attempt to authenticate an already authenticated session Switching users is not supported (409).

func ErrAlreadyExists

func ErrAlreadyExists(id, topic string, ts time.Time) *ServerComMessage

ErrAlreadyExists the object already exists (409).

func ErrAttachFirst

func ErrAttachFirst(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrAttachFirst must attach to topic first in response to a client message (409).

func ErrAuthFailed

func ErrAuthFailed(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrAuthFailed authentication failed with explicit server and incoming request timestamps (401).

func ErrAuthRequired

func ErrAuthRequired(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrAuthRequired authentication required - user must authenticate first (401).

func ErrAuthRequiredReply

func ErrAuthRequiredReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrAuthRequiredReply authentication required - user must authenticate first in response to a client request (401).

func ErrAuthUnknownScheme

func ErrAuthUnknownScheme(id, topic string, ts time.Time) *ServerComMessage

ErrAuthUnknownScheme authentication scheme is unrecognized or invalid (401).

func ErrCallBusyExplicitTs

func ErrCallBusyExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrCallBusyExplicitTs indicates a "busy" reply to a video call request (486).

func ErrCallBusyReply

func ErrCallBusyReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrCallBusyReply indicates a "busy" reply in response to a video call request (486)

func ErrClusterUnreachableExplicitTs

func ErrClusterUnreachableExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrClusterUnreachable in-cluster communication has failed error with explicit server and incoming request timestamps (502).

func ErrClusterUnreachableReply

func ErrClusterUnreachableReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrClusterUnreachableReply in-cluster communication has failed error as response to a client request (502).

func ErrCommandOutOfSequence

func ErrCommandOutOfSequence(id, unused string, ts time.Time) *ServerComMessage

ErrCommandOutOfSequence invalid sequence of comments, i.e. attempt to {sub} before {hi} (409).

func ErrDuplicateCredential

func ErrDuplicateCredential(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrDuplicateCredential attempt to create a duplicate credential with explicit server and incoming request timestamps (409).

func ErrGone

func ErrGone(id, topic string, ts time.Time) *ServerComMessage

ErrGone topic deleted or user banned (410).

func ErrInvalidResponse

func ErrInvalidResponse(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrInvalidResponse indicates that the client's response in invalid with explicit server and incoming request timestamps (406).

func ErrLocked

func ErrLocked(id, topic string, ts time.Time) *ServerComMessage

ErrLocked operation rejected because the topic is being deleted (503).

func ErrLockedExplicitTs

func ErrLockedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrLockedExplicitTs operation rejected because the topic is being deleted with explicit server and incoming request timestamps (503).

func ErrLockedReply

func ErrLockedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrLockedReply operation rejected because the topic is being deleted in response to a client request (503).

func ErrMalformed

func ErrMalformed(id, topic string, ts time.Time) *ServerComMessage

ErrMalformed request malformed (400).

func ErrMalformedExplicitTs

func ErrMalformedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrMalformedExplicitTs request malformed with explicit server and incoming request timestamps (400).

func ErrMalformedReply

func ErrMalformedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrMalformedReply request malformed in response to a client request (400).

func ErrNotFound

func ErrNotFound(id, topic string, ts time.Time) *ServerComMessage

ErrNotFound is an error for missing objects other than user or topic (404).

func ErrNotFoundExplicitTs

func ErrNotFoundExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrNotFoundExplicitTs is an error for missing objects other than user or topic with explicit server and incoming request timestamps (404).

func ErrNotFoundReply

func ErrNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrNotFoundReply is an error for missing objects other than user or topic with explicit server and incoming request timestamps in response to a client request (404).

func ErrNotImplemented

func ErrNotImplemented(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrNotImplemented feature not implemented with explicit server and incoming request timestamps (501). TODO: consider changing status code to 4XX.

func ErrNotImplementedReply

func ErrNotImplementedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrNotImplementedReply feature not implemented error in response to a client request (501).

func ErrOperationNotAllowed

func ErrOperationNotAllowed(id, topic string, ts time.Time) *ServerComMessage

ErrOperationNotAllowed a valid operation is not permitted in this context (405).

func ErrOperationNotAllowedExplicitTs

func ErrOperationNotAllowedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrOperationNotAllowedExplicitTs a valid operation is not permitted in this context with explicit server and incoming request timestamps (405).

func ErrOperationNotAllowedReply

func ErrOperationNotAllowedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrOperationNotAllowedReply a valid operation is not permitted in this context with explicit server and incoming request timestamps (405).

func ErrPermissionDenied

func ErrPermissionDenied(id, topic string, ts time.Time) *ServerComMessage

ErrPermissionDenied user is authenticated but operation is not permitted (403).

func ErrPermissionDeniedExplicitTs

func ErrPermissionDeniedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrPermissionDeniedExplicitTs user is authenticated but operation is not permitted with explicit server and incoming request timestamps (403).

func ErrPermissionDeniedReply

func ErrPermissionDeniedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrPermissionDeniedReply user is authenticated but operation is not permitted with explicit server and incoming request timestamps in response to a client request (403).

func ErrPolicy

func ErrPolicy(id, topic string, ts time.Time) *ServerComMessage

ErrPolicy request violates a policy (e.g. password is too weak or too many subscribers) (422).

func ErrPolicyExplicitTs

func ErrPolicyExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrPolicyExplicitTs request violates a policy (e.g. password is too weak or too many subscribers) with explicit server and incoming request timestamps (422).

func ErrPolicyReply

func ErrPolicyReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrPolicyReply request violates a policy (e.g. password is too weak or too many subscribers) with explicit server and incoming request timestamps in response to a client request (422).

func ErrServiceUnavailableExplicitTs

func ErrServiceUnavailableExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrServiceUnavailableExplicitTs server overloaded error with explicit server and incoming request timestamps (503).

func ErrServiceUnavailableReply

func ErrServiceUnavailableReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrServiceUnavailableReply server overloaded error in response to a client request (503).

func ErrSessionNotFound

func ErrSessionNotFound(ts time.Time) *ServerComMessage

ErrSessionNotFound valid API key is required (403).

func ErrTooLarge

func ErrTooLarge(id, topic string, ts time.Time) *ServerComMessage

ErrTooLarge packet or request size exceeded the limit (413).

func ErrTopicNotFound

func ErrTopicNotFound(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrTopicNotFound topic is not found with explicit server and incoming request timestamps (404).

func ErrTopicNotFoundReply

func ErrTopicNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrTopicNotFoundReply topic is not found with explicit server and incoming request timestamps in response to a client request (404).

func ErrUnknown

func ErrUnknown(id, topic string, ts time.Time) *ServerComMessage

ErrUnknown database or other server error (500).

func ErrUnknownExplicitTs

func ErrUnknownExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrUnknownExplicitTs database or other server error with explicit server and incoming request timestamps (500).

func ErrUnknownReply

func ErrUnknownReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrUnknownReply database or other server error in response to a client request (500).

func ErrUserNotFound

func ErrUserNotFound(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

ErrUserNotFound user is not found with explicit server and incoming request timestamps (404).

func ErrUserNotFoundReply

func ErrUserNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

ErrUserNotFoundReply user is not found with explicit server and incoming request timestamps in response to a client request (404).

func ErrVersionNotSupported

func ErrVersionNotSupported(id string, ts time.Time) *ServerComMessage

ErrVersionNotSupported invalid (too low) protocol version (505).

func InfoAlreadySubscribed

func InfoAlreadySubscribed(id, topic string, ts time.Time) *ServerComMessage

InfoAlreadySubscribed response means request to subscribe was ignored because user is already subscribed (304).

func InfoAuthReset

func InfoAuthReset(id string, ts time.Time) *ServerComMessage

InfoAuthReset is sent in response to request to reset authentication when it was completed but login was not performed (301).

func InfoChallenge

func InfoChallenge(id string, ts time.Time, challenge []byte) *ServerComMessage

InfoChallenge requires user to respond to presented challenge before login can be completed (300).

func InfoFound

func InfoFound(id, topic string, ts time.Time) *ServerComMessage

InfoFound redirects to a new resource (307).

func InfoNoAction

func InfoNoAction(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

InfoNoAction response means request was ignored because the object was already in the desired state with explicit server and incoming request timestamps (304).

func InfoNoActionReply

func InfoNoActionReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

InfoNoActionReply response means request was ignored because the object was already in the desired state in response to a client request (304).

func InfoNotJoined

func InfoNotJoined(id, topic string, ts time.Time) *ServerComMessage

InfoNotJoined response means request to leave was ignored because user was not subscribed (304).

func InfoNotModified

func InfoNotModified(id, topic string, ts time.Time) *ServerComMessage

InfoNotModified response means update request was a noop (304).

func InfoNotModifiedExplicitTs

func InfoNotModifiedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

InfoNotModifiedExplicitTs response means update request was a noop with explicit server and incoming request timestamps (304).

func InfoNotModifiedReply

func InfoNotModifiedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

InfoNotModifiedReply response means update request was a noop in response to a client request (304).

func InfoUseOther

func InfoUseOther(id, topic, other string, serverTs, incomingReqTs time.Time) *ServerComMessage

InfoUseOther is a response to a subscription request redirecting client to another topic (303).

func InfoUseOtherReply

func InfoUseOtherReply(msg *ClientComMessage, other string, ts time.Time) *ServerComMessage

InfoUseOtherReply is a response to a subscription request redirecting client to another topic (303).

func InfoValidateCredentials

func InfoValidateCredentials(id string, ts time.Time) *ServerComMessage

InfoValidateCredentials requires user to confirm credentials before going forward (300).

func InfoValidateCredentialsExplicitTs

func InfoValidateCredentialsExplicitTs(id string, serverTs, incomingReqTs time.Time) *ServerComMessage

InfoValidateCredentialsExplicitTs requires user to confirm credentials before going forward with explicit server and incoming request timestamps (300).

func NoContentParams

func NoContentParams(id, topic string, serverTs, incomingReqTs time.Time, params any) *ServerComMessage

NoContentParams indicates request was processed but resulted in no content (204).

func NoContentParamsReply

func NoContentParamsReply(msg *ClientComMessage, ts time.Time, params any) *ServerComMessage

NoContentParamsReply indicates request was processed but resulted in no content in response to a client request (204).

func NoErr

func NoErr(id, topic string, ts time.Time) *ServerComMessage

NoErr indicates successful completion (200).

func NoErrAccepted

func NoErrAccepted(id, topic string, ts time.Time) *ServerComMessage

NoErrAccepted indicates request was accepted but not processed yet (202).

func NoErrAcceptedExplicitTs

func NoErrAcceptedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

NoErrAcceptedExplicitTs indicates request was accepted but not processed yet with explicit server and incoming request timestamps (202).

func NoErrCreated

func NoErrCreated(id, topic string, ts time.Time) *ServerComMessage

NoErrCreated indicated successful creation of an object (201).

func NoErrDeliveredParams

func NoErrDeliveredParams(id, topic string, ts time.Time, params any) *ServerComMessage

NoErrDeliveredParams means requested content has been delivered (208).

func NoErrEvicted

func NoErrEvicted(id, topic string, ts time.Time) *ServerComMessage

NoErrEvicted indicates that the user was disconnected from topic for no fault of the user (205).

func NoErrExplicitTs

func NoErrExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage

NoErrExplicitTs indicates successful completion with explicit server and incoming request timestamps (200).

func NoErrParams

func NoErrParams(id, topic string, ts time.Time, params any) *ServerComMessage

NoErrParams indicates successful completion with additional parameters (200).

func NoErrParamsExplicitTs

func NoErrParamsExplicitTs(id, topic string, serverTs, incomingReqTs time.Time, params any) *ServerComMessage

NoErrParamsExplicitTs indicates successful completion with additional parameters and explicit server and incoming request timestamps (200).

func NoErrParamsReply

func NoErrParamsReply(msg *ClientComMessage, ts time.Time, params any) *ServerComMessage

NoErrParamsReply indicates successful completion with additional parameters and explicit server and incoming request timestamps (200).

func NoErrReply

func NoErrReply(msg *ClientComMessage, ts time.Time) *ServerComMessage

NoErrReply indicates successful completion as a reply to a client message (200).

func NoErrShutdown

func NoErrShutdown(ts time.Time) *ServerComMessage

NoErrShutdown means user was disconnected from topic because system shutdown is in progress (205).

func (*ServerComMessage) Describe

func (src *ServerComMessage) Describe() string

type Session

type Session struct {
	// contains filtered or unexported fields
}

type SessionArgs

type SessionArgs struct {
	Lifetime time.Duration
	Stats    *stats.Stats
	UGen     *st.UidGenerator
}

type SessionProto

type SessionProto int

SessionProto is the type of the wire transport.

const (
	// NONE is undefined/not set.
	NONE SessionProto = iota
	// WEBSOCK represents websocket connection.
	WEBSOCK
	// LPOLL represents a long polling connection.
	LPOLL
	// GRPC is a gRPC connection
	GRPC
	// PROXY is temporary session used as a proxy at master node.
	PROXY
	// MULTIPLEX is a multiplexing session representing a connection from proxy topic to master.
	MULTIPLEX
)

Constants defining individual types of wire transports.

type SessionStore

type SessionStore struct {
	// contains filtered or unexported fields
}

SessionStore holds live sessions. Long polling sessions are stored in a linked list with

most recent sessions on top. In addition all sessions are stored in a map indexed by session ID.

func NewSessionStore

func NewSessionStore(sa SessionArgs) *SessionStore

NewSessionStore initializes a session store.

func (*SessionStore) Delete

func (ss *SessionStore) Delete(s *Session)

Delete removes session from store.

func (*SessionStore) EvictUser

func (ss *SessionStore) EvictUser(uid types.Uid, skipSid string)

EvictUser terminates all sessions of a given user.

func (*SessionStore) Get

func (ss *SessionStore) Get(sid string) *Session

Get fetches a session from store by session ID.

func (*SessionStore) NewSession

func (ss *SessionStore) NewSession(conn any, sid string) (*Session, int)

NewSession creates a new session and saves it to the session store.

func (*SessionStore) NodeRestarted

func (ss *SessionStore) NodeRestarted(nodeName string, fingerprint int64)

NodeRestarted removes stale sessions from a restarted cluster node.

  • nodeName is the name of affected node
  • fingerprint is the new fingerprint of the node.

func (*SessionStore) Range

func (ss *SessionStore) Range(f func(sid string, s *Session) bool)

Range calls given function for all sessions. It stops if the function returns false.

func (*SessionStore) Shutdown

func (ss *SessionStore) Shutdown()

Shutdown terminates sessionStore. No need to clean up. Don't send to clustered sessions, their servers are not being shut down.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is a mapper of sessions to topics.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic is an isolated communication channel

type UserCacheReq

type UserCacheReq struct {
	// Name of the node sending this request in case of cluster. Not set otherwise.
	Node string

	// UserId is set when count of unread messages is updated for a single user or
	// when the user is being deleted.
	UserId types.Uid
	// UserIdList  is set when subscription count is updated for users of a topic.
	UserIdList []types.Uid
	// Unread count (UserId is set)
	Unread int

	// In case of set UserId: treat Unread count as an increment as opposite to the final value.
	// In case of set UserIdList: intement (Inc == true) or decrement subscription count by one.
	Inc bool
	// User is being deleted, remove user from cache.
	Gone bool

	// Optional push notification
	PushRcpt *pt.Receipt
}

UserCacheReq contains data which mutates one or more user cache entries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL