Documentation ¶
Overview ¶
*****************************************************************************
- Description :
- Topic in a cluster which serves as a local representation of the master
- topic hosted at another node. ****************************************************************************
Index ¶
- Constants
- func Init(sa ServerArgs)
- func InitUsersGarbageCollection(period time.Duration, blockSize, minAccountAgeHours int) chan<- bool
- func NewCluster(ca ClusterArgs) (int, error)
- type ClientComMessage
- type Cluster
- func (c *Cluster) Health(health *ClusterHealth, unused *bool) error
- func (c *Cluster) Ping(ping *ClusterPing, unused *bool) error
- func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error
- func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error
- func (Cluster) TopicProxy(msg *ClusterResp, unused *bool) error
- func (c *Cluster) UserCacheUpdate(msg *UserCacheReq, rejected *bool) error
- func (c *Cluster) Vote(vreq *ClusterVoteRequest, response *ClusterVoteResponse) error
- type ClusterArgs
- type ClusterFailover
- type ClusterHealth
- type ClusterNode
- type ClusterPing
- type ClusterReq
- type ClusterResp
- type ClusterRoute
- type ClusterSess
- type ClusterSessUpdate
- type ClusterVote
- type ClusterVoteRequest
- type ClusterVoteResponse
- type CredValidator
- type Hub
- type MsgAccessMode
- type MsgClientAcc
- type MsgClientDel
- type MsgClientExtra
- type MsgClientGet
- type MsgClientHi
- type MsgClientLeave
- type MsgClientLogin
- type MsgClientNote
- type MsgClientPub
- type MsgClientSet
- type MsgClientSub
- type MsgCredClient
- type MsgCredServer
- type MsgDefaultAcsMode
- type MsgDelRange
- type MsgDelValues
- type MsgGetOpts
- type MsgGetQuery
- type MsgLastSeenInfo
- type MsgServerCtrl
- type MsgServerData
- type MsgServerInfo
- type MsgServerMeta
- type MsgServerPres
- type MsgSetDesc
- type MsgSetQuery
- type MsgSetSub
- type MsgTopicDesc
- type MsgTopicSub
- type Plugin
- type PluginFilter
- type ProxyReqType
- type Server
- type ServerArgs
- type ServerComMessage
- func ErrAPIKeyRequired(ts time.Time) *ServerComMessage
- func ErrAlreadyAuthenticated(id, topic string, ts time.Time) *ServerComMessage
- func ErrAlreadyExists(id, topic string, ts time.Time) *ServerComMessage
- func ErrAttachFirst(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrAuthFailed(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrAuthRequired(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrAuthRequiredReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrAuthUnknownScheme(id, topic string, ts time.Time) *ServerComMessage
- func ErrCallBusyExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrCallBusyReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrClusterUnreachableExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrClusterUnreachableReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrCommandOutOfSequence(id, unused string, ts time.Time) *ServerComMessage
- func ErrDuplicateCredential(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrGone(id, topic string, ts time.Time) *ServerComMessage
- func ErrInvalidResponse(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrLocked(id, topic string, ts time.Time) *ServerComMessage
- func ErrLockedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrLockedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrMalformed(id, topic string, ts time.Time) *ServerComMessage
- func ErrMalformedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrMalformedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrNotFound(id, topic string, ts time.Time) *ServerComMessage
- func ErrNotFoundExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrNotImplemented(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrNotImplementedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrOperationNotAllowed(id, topic string, ts time.Time) *ServerComMessage
- func ErrOperationNotAllowedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrOperationNotAllowedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrPermissionDenied(id, topic string, ts time.Time) *ServerComMessage
- func ErrPermissionDeniedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrPermissionDeniedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrPolicy(id, topic string, ts time.Time) *ServerComMessage
- func ErrPolicyExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrPolicyReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrServiceUnavailableExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrServiceUnavailableReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrSessionNotFound(ts time.Time) *ServerComMessage
- func ErrTooLarge(id, topic string, ts time.Time) *ServerComMessage
- func ErrTopicNotFound(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrTopicNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrUnknown(id, topic string, ts time.Time) *ServerComMessage
- func ErrUnknownExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrUnknownReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrUserNotFound(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func ErrUserNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func ErrVersionNotSupported(id string, ts time.Time) *ServerComMessage
- func InfoAlreadySubscribed(id, topic string, ts time.Time) *ServerComMessage
- func InfoAuthReset(id string, ts time.Time) *ServerComMessage
- func InfoChallenge(id string, ts time.Time, challenge []byte) *ServerComMessage
- func InfoFound(id, topic string, ts time.Time) *ServerComMessage
- func InfoNoAction(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func InfoNoActionReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func InfoNotJoined(id, topic string, ts time.Time) *ServerComMessage
- func InfoNotModified(id, topic string, ts time.Time) *ServerComMessage
- func InfoNotModifiedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func InfoNotModifiedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func InfoUseOther(id, topic, other string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func InfoUseOtherReply(msg *ClientComMessage, other string, ts time.Time) *ServerComMessage
- func InfoValidateCredentials(id string, ts time.Time) *ServerComMessage
- func InfoValidateCredentialsExplicitTs(id string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func NoContentParams(id, topic string, serverTs, incomingReqTs time.Time, params any) *ServerComMessage
- func NoContentParamsReply(msg *ClientComMessage, ts time.Time, params any) *ServerComMessage
- func NoErr(id, topic string, ts time.Time) *ServerComMessage
- func NoErrAccepted(id, topic string, ts time.Time) *ServerComMessage
- func NoErrAcceptedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func NoErrCreated(id, topic string, ts time.Time) *ServerComMessage
- func NoErrDeliveredParams(id, topic string, ts time.Time, params any) *ServerComMessage
- func NoErrEvicted(id, topic string, ts time.Time) *ServerComMessage
- func NoErrExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
- func NoErrParams(id, topic string, ts time.Time, params any) *ServerComMessage
- func NoErrParamsExplicitTs(id, topic string, serverTs, incomingReqTs time.Time, params any) *ServerComMessage
- func NoErrParamsReply(msg *ClientComMessage, ts time.Time, params any) *ServerComMessage
- func NoErrReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
- func NoErrShutdown(ts time.Time) *ServerComMessage
- type Session
- type SessionArgs
- type SessionProto
- type SessionStore
- func (ss *SessionStore) Delete(s *Session)
- func (ss *SessionStore) EvictUser(uid types.Uid, skipSid string)
- func (ss *SessionStore) Get(sid string) *Session
- func (ss *SessionStore) NewSession(conn any, sid string) (*Session, int)
- func (ss *SessionStore) NodeRestarted(nodeName string, fingerprint int64)
- func (ss *SessionStore) Range(f func(sid string, s *Session) bool)
- func (ss *SessionStore) Shutdown()
- type Subscription
- type Topic
- type UserCacheReq
Constants ¶
const ( // StopNone no reason given/default. StopNone = iota // StopShutdown terminated due to system shutdown. StopShutdown // StopDeleted terminated due to being deleted. StopDeleted // StopRehashing terminated due to cluster rehashing (moved to a different node). StopRehashing )
Reasons why topic is being shut down.
Variables ¶
This section is empty.
Functions ¶
func InitUsersGarbageCollection ¶
func InitUsersGarbageCollection(period time.Duration, blockSize, minAccountAgeHours int) chan<- bool
InitUsersGarbageCollection() runs every 'period' and deletes up to 'blockSize'
stale un-validated user accounts which have been last updated at least 'minAccountAgeHours' hours.
Returns channel which can be used to stop the process.
func NewCluster ¶
func NewCluster(ca ClusterArgs) (int, error)
NewCluster() returns snowflake worker id (pass nil if you don't want to use cluster)
Cluster won't be started here yet.
Types ¶
type ClientComMessage ¶
type ClientComMessage struct { Hi *MsgClientHi `json:"hi"` Acc *MsgClientAcc `json:"acc"` Login *MsgClientLogin `json:"login"` Sub *MsgClientSub `json:"sub"` Leave *MsgClientLeave `json:"leave"` Pub *MsgClientPub `json:"pub"` Get *MsgClientGet `json:"get"` Set *MsgClientSet `json:"set"` Del *MsgClientDel `json:"del"` Note *MsgClientNote `json:"note"` // Optional data. Extra *MsgClientExtra `json:"extra"` // Message Id de-normalized Id string `json:"-"` // Un-routable (original) topic name de-normalized from XXX.Topic. Original string `json:"-"` // Routable (expanded) topic name. RcptTo string `json:"-"` // Sender's UserId as string. AsUser string `json:"-"` // Sender's authentication level. AuthLvl int `json:"-"` // De-normalized 'what' field of meta messages (set, get, del). MetaWhat int `json:"-"` // Timestamp when this message was received by the server. Timestamp time.Time `json:"-"` // contains filtered or unexported fields }
ClientComMessage is a wrapper for client messages.
type Cluster ¶
type Cluster struct {
// contains filtered or unexported fields
}
Cluster is the representation of the cluster.
func (*Cluster) Health ¶
func (c *Cluster) Health(health *ClusterHealth, unused *bool) error
Health is called by the leader node to assert leadership and check status of the followers.
func (*Cluster) Ping ¶
func (c *Cluster) Ping(ping *ClusterPing, unused *bool) error
Ping is a gRPC endpoint which receives ping requests from peer nodes.Used to detect node restarts.
func (*Cluster) Route ¶
func (c *Cluster) Route(msg *ClusterRoute, rejected *bool) error
Route endpoint receives intra-cluster messages destined for the nodes hosting the topic. Called by Hub.route channel consumer for messages send without attaching to topic first.
func (*Cluster) TopicMaster ¶
func (c *Cluster) TopicMaster(msg *ClusterReq, rejected *bool) error
TopicMaster is a gRPC endpoint which receives requests sent by proxy topic to master topic.
func (Cluster) TopicProxy ¶
func (Cluster) TopicProxy(msg *ClusterResp, unused *bool) error
TopicProxy is a gRPC endpoint at topic proxy which receives topic master responses.
func (*Cluster) UserCacheUpdate ¶
func (c *Cluster) UserCacheUpdate(msg *UserCacheReq, rejected *bool) error
UserCacheUpdate endpoint receives updates to user's cached values as well as sends push notifications.
func (*Cluster) Vote ¶
func (c *Cluster) Vote(vreq *ClusterVoteRequest, response *ClusterVoteResponse) error
Vote processes request for a vote from a candidate.
type ClusterArgs ¶
type ClusterFailover ¶
type ClusterFailover struct {
// contains filtered or unexported fields
}
Failover config.
type ClusterHealth ¶
type ClusterHealth struct { // Name of the leader node Leader string // Election term Term int // Ring hash signature that represents the cluster Signature string // Names of nodes currently active in the cluster Nodes []string }
ClusterHealth is content of a leader's health check of a follower node.
type ClusterNode ¶
type ClusterNode struct {
// contains filtered or unexported fields
}
ClusterNode is a client's connection to another node.
type ClusterPing ¶
type ClusterPing struct { // Name of the node sending this request. Node string // Fingerprint of the node sending this request. // Fingerprint changes when the node restarts. Fingerprint int64 }
ClusterPing is used to detect node restarts.
type ClusterReq ¶
type ClusterReq struct { // Name of the Node sending this request Node string // Ring hash Signature of the node sending this request Signature must match // the Signature of the receiver, otherwise the Cluster is desynchronized. Signature string // Fingerprint of the node sending this request. // Fingerprint changes when the node is restarted. Fingerprint int64 // Type of request. ReqType ProxyReqType // Client message. Set for C2S requests. CliMsg *ClientComMessage // Message to be routed. Set for intra-cluster route requests. SrvMsg *ServerComMessage // Expanded (routable) topic name RcptTo string // Originating session Sess *ClusterSess // True when the topic proxy is Gone. Gone bool }
ClusterReq is either a Proxy to Master or Topic Proxy to Topic Master or intra-cluster routing request message.
type ClusterResp ¶
type ClusterResp struct { // Server message with the response. SrvMsg *ServerComMessage // Originating session ID to forward response to, if any. OrigSid string // Expanded (routable) topic name RcptTo string // Original request type. OrigReqType ProxyReqType }
ClusterResp is a Master to Proxy response message.
type ClusterRoute ¶
type ClusterRoute struct { // Name of the node sending this request Node string // Ring hash signature of the node sending this request // Signature must match the signature of the receiver, otherwise the // Cluster is desynchronized. Signature string // Fingerprint of the node sending this request. // Fingerprint changes when the node is restarted. Fingerprint int64 // Message to be routed. Set for intra-cluster route requests. SrvMsg *ServerComMessage // Originating session Sess *ClusterSess }
ClusterRoute is intra-cluster routing request message.
type ClusterSess ¶
type ClusterSess struct { // IP address of the client. For long polling this is the IP of the last poll RemoteAddr string // User agent, a string provided by an authenticated client in {login} packet UserAgent string // ID of the current user or 0 Uid types.Uid // User's authentication level AuthLvl at.Level // Protocol version of the client: ((major & 0xff) << 8) | (minor & 0xff) Ver int // Human language of the client Lang string // Country of the client CountryCode string // Device ID DeviceID string // Device Platform: "web", "ios", "android" Platform string // Session ID Sid string // Background session Background bool }
ClusterSess is a basic info on a remote session where the message was created.
type ClusterSessUpdate ¶
type ClusterSessUpdate struct { // User this session represents. Uid types.Uid // Session id. Sid string // Session user agent. UserAgent string }
ClusterSessUpdate represents a request to update a session. User Agent change or background session comes to foreground.
type ClusterVote ¶
type ClusterVote struct {
// contains filtered or unexported fields
}
ClusterVote is a vote request and a response in leader election.
type ClusterVoteRequest ¶
type ClusterVoteRequest struct { // Candidate node which issued this request Node string // Election term Term int }
ClusterVoteRequest is a request from a leader candidate to a node to vote for the candidate.
type ClusterVoteResponse ¶
type ClusterVoteResponse struct { // Actual vote Result bool // Node's term after the vote Term int }
ClusterVoteResponse is a vote from a node.
type CredValidator ¶
type CredValidator struct { // AuthLevel(s) which require this validator. RequiredAuthLvl []types.Level AddToTags bool }
CredValidator holds additional config params for a credential validator.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub is the core structure which holds topics.
type MsgAccessMode ¶
type MsgAccessMode struct { // Access mode requested by the user Want string // Access mode granted to the user by the admin Given string // Cumulative access mode want & given Mode string }
MsgAccessMode is a definition of access mode.
type MsgClientAcc ¶
type MsgClientAcc struct { // message id Id string // "newXYZ" to create a new user or UserId to update a user; default: current user. User string // Temporary authentication parameters for one-off actions, like password reset. TmpScheme string TmpSecret []byte // Account state: normal, suspended. State string // Authentication level of the user when UserID is set and not equal to the current user. // Either "", "auth" or "anon". Default: "" AuthLevel string // The initial authentication scheme the account can use Scheme string // Shared secret Secret []byte // Authenticate session with the newly created account Login bool // Indexable tags for user discovery Tags []string // User initialization data when creating a new user, otherwise ignored Desc *MsgSetDesc // Credentials to verify (email or phone or captcha) Cred []MsgCredClient }
MsgClientAcc is an {acc} message for creating or updating a user account.
type MsgClientDel ¶
type MsgClientDel struct { Id string `json:"id,omitempty"` Topic string `json:"topic,omitempty"` // What to delete: // * "msg" to delete messages (default) // * "topic" to delete the topic // * "sub" to delete a subscription to topic. // * "user" to delete or disable user. // * "cred" to delete credential (email or phone) What string `json:"what"` // Delete messages with these IDs (either one by one or a set of ranges) DelSeq []MsgDelRange `json:"del_seq"` // User ID of the user or subscription to delete User string `json:"user"` // Credential to delete Cred *MsgCredClient `json:"cred"` // Request to hard-delete objects (i.e. delete messages for all users), if such option is available. Hard bool `json:"hard"` }
MsgClientDel delete messages or topic {del}.
type MsgClientExtra ¶
type MsgClientExtra struct { // Array of out-of-band attachments which have to be exempted from GC. Attachments []string `json:"attachments,omitempty"` // Alternative user ID set by the root user (obo = On Behalf Of). AsUser string `json:"obo,omitempty"` // Altered authentication level set by the root user. AuthLevel string `json:"authlevel,omitempty"` }
MsgClientExtra is not a stand-alone message but extra data which augments the main payload.
type MsgClientGet ¶
type MsgClientGet struct { Id string `json:"id,omitempty"` Topic string `json:"topic"` MsgGetQuery }
MsgClientGet is a query of topic state {get}.
type MsgClientHi ¶
type MsgClientHi struct { Id string `json:"id"` UserAgent string `json:"user_agent"` Version string `json:"version"` // Protocol version, i.e. "0.13" DeviceID string `json:"device_id"` // Client's unique device ID Lang string `json:"lang"` // ISO 639-1 human language of the connected device Platform string `json:"platform"` // Platform code: ios, android, web. // Session is initially in non-iteractive, i.e. issued by a service. Presence notifications are delayed. Background bool `json:"background"` }
MsgClientHi is a handshake {hi} message
type MsgClientLeave ¶
type MsgClientLeave struct { Id string `json:"id"` Topic string `json:"topic"` Unsub bool `json:"unsub"` }
MsgClientLeave is an unsubscribe {leave} request message.
type MsgClientLogin ¶
type MsgClientLogin struct { // message id Id string // Authentication scheme Scheme string // Shared secret Secret []byte // Credntials being verified (email or phone or captcha etc.) Cred []MsgCredClient }
MsgClientLogin is a login {login} message.
type MsgClientNote ¶
type MsgClientNote struct { // There is no Id -- server will not akn {ping} packets, they are "fire and forget" Topic string `json:"topic"` // what is being reported: "recv" - message received, "read" - message read, "kp" - typing notification What string `json:"what"` // Server-issued message ID being reported SeqId int `json:"seq,omitempty"` // Client's count of unread messages to report back to the server. Used in push notifications on iOS. Unread int `json:"unread,omitempty"` // Call event. Event string `json:"event,omitempty"` // Arbitrary json payload (used in video calls). Payload json.RawMessage `json:"payload,omitempty"` }
MsgClientNote is a client-generated notification for topic subscribers {note}.
type MsgClientPub ¶
type MsgClientPub struct { Id string `json:"id"` Topic string `json:"topic"` NoEcho bool `json:"no_echo"` Head map[string]any `json:"head"` Content any `json:"content"` }
MsgClientPub is client's request to publish data to topic subscribers {pub}.
type MsgClientSet ¶
type MsgClientSet struct { Id string `json:"id,omitempty"` Topic string `json:"topic"` MsgSetQuery }
MsgClientSet is an update of topic state {set}.
type MsgClientSub ¶
type MsgClientSub struct { Id string `json:"id"` Topic string `json:"topic"` // Mirrors {set}. Set *MsgSetQuery `json:"set"` // Mirrors {get}. Get *MsgGetQuery `json:"get"` // True if this subscription created a new topic. In case of p2p topics, it's true if the other // user's subscription was created (as a part of new topic creation or just alone). Created bool `json:"-"` // True if this is a new subscription. Newsub bool `json:"-"` }
MsgClientSub is a subscription request {sub} message.
type MsgCredClient ¶
type MsgCredClient struct { // Credential type, i.e. `email` or `tel`. Method string // Value to verify, i.e. `user@example.com` or `+18003287448` Value string // Verification response Response string // Request parameters, such as preferences. Passed to valiator without interpretation. Params map[string]any }
MsgCredClient is an account credential such as email or phone number.
type MsgCredServer ¶
type MsgCredServer struct { // Credential type, i.e. `email` or `tel`. Method string `json:"meth,omitempty"` // Credential value, i.e. `user@example.com` or `+18003287448` Value string `json:"val,omitempty"` // Indicates that the credential is validated. Done bool `json:"done,omitempty"` }
MsgCredServer is an account credential such as email or phone number.
type MsgDefaultAcsMode ¶
type MsgDefaultAcsMode struct { Auth string `mapstructure:"auth"` Anon string `mapstructure:"anon"` }
MsgDefaultAcsMode is a C2S in set.what == "desc", acc, sub message.
type MsgDelRange ¶
MsgDelRange is either an individual ID (HiID=0) or a randge of deleted IDs,
low end inclusive (closed), high-end exclusive (open): [LowId .. HiId), e.g. 1..5 -> 1, 2, 3, 4.
type MsgDelValues ¶
type MsgDelValues struct { DelId int `json:"clear,omitempty"` DelSeq []MsgDelRange `json:"delseq,omitempty"` }
MsgDelValues describes request to delete messages.
type MsgGetOpts ¶
type MsgGetOpts struct { // Optional User ID to return result(s) for one user. User string `json:"user"` // Optional topic name to return result(s) for one topic. Topic string `json:"topic"` // Return results modified since this timespamp. IfModifiedSince *time.Time `json:"if_modified_since"` // Load messages/ranges with IDs equal or greater than this (inclusive or closed) SinceId int `json:"since_id"` // Load messages/ranges with IDs lower than this (exclusive or open) BeforeId int `json:"before_id"` // Limit the number of messages loaded Limit int `json:"limit"` }
MsgGetOpts defines Get query parameters.
type MsgGetQuery ¶
type MsgGetQuery struct { What string // Parameters of "desc" request: IfModifiedSince Desc *MsgGetOpts `json:"desc,omitempty"` // Parameters of "sub" request: User, Topic, IfModifiedSince, Limit. Sub *MsgGetOpts `json:"sub,omitempty"` // Parameters of "data" request: Since, Before, Limit. Data *MsgGetOpts `json:"data,omitempty"` // Parameters of "del" request: Since, Before, Limit. Del *MsgGetOpts `json:"del,omitempty"` }
MsgGetQuery is a topic metadata or data query.
type MsgLastSeenInfo ¶
type MsgLastSeenInfo struct { // Timestamp of user's last appearance online. When *time.Time `json:"when,omitempty"` // User agent of the device when the user was last online. UserAgent string `json:"ua,omitempty"` }
MsgLastSeenInfo contains info on user's appearance online - when & user agent.
type MsgServerCtrl ¶
type MsgServerCtrl struct { Id string `json:"id,omitempty"` Topic string `json:"topic,omitempty"` Params any `json:"params,omitempty"` Code int `json:"code"` Text string `json:"text,omitempty"` Timestamp time.Time `json:"ts"` }
MsgServerCtrl is a server control message {ctrl}.
type MsgServerData ¶
type MsgServerData struct { Topic string `json:"topic"` // ID of the user who originated the message as {pub}, could be empty if sent by the system From string `json:"from,omitempty"` Timestamp time.Time `json:"ts"` DeletedAt *time.Time `json:"deleted,omitempty"` SeqId int `json:"seq"` Head map[string]any `json:"head,omitempty"` Content any `json:"content"` }
MsgServerData is a server {data} message.
type MsgServerInfo ¶
type MsgServerInfo struct { // Topic to send event to. Topic string `json:"topic"` // Topic where the event has occurred (set only when Topic='me'). Src string `json:"src,omitempty"` // ID of the user who originated the message. From string `json:"from,omitempty"` // The event being reported: "rcpt" - message received, "read" - message read, "kp" - typing notification, "call" - video call. What string `json:"what"` // Server-issued message ID being reported. SeqId int `json:"seq,omitempty"` // Call event. Event string `json:"event,omitempty"` // Arbitrary json payload (used by video calls). Payload json.RawMessage `json:"payload,omitempty"` // When sending to 'me', skip sessions subscribed to this topic. SkipTopic string `json:"-"` }
MsgServerInfo is the server-side copy of MsgClientNote with From and optionally Src added (non-authoritative).
type MsgServerMeta ¶
type MsgServerMeta struct { Id string `json:"id,omitempty"` Topic string `json:"topic"` Timestamp *time.Time `json:"ts,omitempty"` // Topic description Desc *MsgTopicDesc `json:"desc,omitempty"` // Subscriptions as an array of objects Sub []MsgTopicSub `json:"sub,omitempty"` // Delete ID and the ranges of IDs of deleted messages Del *MsgDelValues `json:"del,omitempty"` // User discovery tags Tags []string `json:"tags,omitempty"` // Account credentials, 'me' only. Cred []*MsgCredServer `json:"cred,omitempty"` }
MsgServerMeta is a topic metadata {meta} update.
type MsgServerPres ¶
type MsgServerPres struct { Topic string `json:"topic"` Src string `json:"src,omitempty"` What string `json:"what"` UserAgent string `json:"ua,omitempty"` SeqId int `json:"seq,omitempty"` DelId int `json:"clear,omitempty"` DelSeq []MsgDelRange `json:"delseq,omitempty"` AcsTarget string `json:"tgt,omitempty"` AcsActor string `json:"act,omitempty"` // Acs or a delta Acs. Need to marshal it to json under a name different than 'acs' // to allow different handling on the client Acs *MsgAccessMode `json:"dacs,omitempty"` // Flag to break the reply loop WantReply bool `json:"-"` // Additional access mode filters when sending to topic's online members. Both filter conditions must be true. // send only to those who have this access mode. FilterIn int `json:"-"` // skip those who have this access mode. FilterOut int `json:"-"` // When sending to 'me', skip sessions subscribed to this topic. SkipTopic string `json:"-"` // Send to sessions of a single user only. SingleUser string `json:"-"` // Exclude sessions of a single user. ExcludeUser string `json:"-"` }
MsgServerPres is presence notification {pres} (authoritative update).
type MsgSetDesc ¶
type MsgSetDesc struct { DefaultAcs *MsgDefaultAcsMode `json:"dfm"` // default access mode Public any `json:"public"` // description of the user or topic Trusted any `json:"trusted"` // trusted (system-provided) user or topic data Private any `json:"private"` // per-subscription private data }
MsgSetDesc is a C2S in set.what == "desc", acc, sub message.
type MsgSetQuery ¶
type MsgSetQuery struct { // Topic/user description, new object & new subscriptions only Desc *MsgSetDesc `json:"desc"` // Subscription parameters Sub *MsgSetSub `json:"sub"` // Indexable tags for user discovery Tags []string `json:"tags"` // Update to account credentials. Cred *MsgCredClient `json:"cred"` }
MsgSetQuery is an update to topic or user metadata: description, subscriptions, tags, credentials.
type MsgSetSub ¶
type MsgSetSub struct { // User affected by this request. Default (empty): current user User string `json:"user"` // Access mode change, either Given or Want depending on context Mode string `json:"mode"` }
MsgSetSub is a payload in set.sub request to update current subscription or invite another user, {sub.what} == "sub".
type MsgTopicDesc ¶
type MsgTopicDesc struct { CreatedAt *time.Time `json:"created,omitempty"` UpdatedAt *time.Time `json:"updated,omitempty"` // Timestamp of the last message TouchedAt *time.Time `json:"touched,omitempty"` // Account state, 'me' topic only. State string `json:"state,omitempty"` // If the group topic is online. Online bool `json:"online,omitempty"` // If the topic can be accessed as a channel IsChan bool `json:"chan,omitempty"` // P2P other user's last online timestamp & user agent LastSeen *MsgLastSeenInfo `json:"seen,omitempty"` DefaultAcs *MsgDefaultAcsMode `json:"defacs,omitempty"` // Actual access mode Acs *MsgAccessMode `json:"acs,omitempty"` // Max message ID SeqId int `json:"seq,omitempty"` ReadSeqId int `json:"read,omitempty"` RecvSeqId int `json:"recv,omitempty"` // Id of the last delete operation as seen by the requesting user DelId int `json:"clear,omitempty"` Public any `json:"public,omitempty"` Trusted any `json:"trusted,omitempty"` // Per-subscription private data Private any `json:"private,omitempty"` }
MsgTopicDesc is a topic description, S2C in Meta message.
type MsgTopicSub ¶
type MsgTopicSub struct { // Timestamp when the subscription was last updated UpdatedAt *time.Time `json:"updated,omitempty"` // Timestamp when the subscription was deleted DeletedAt *time.Time `json:"deleted,omitempty"` // If the subscriber/topic is online Online bool `json:"online,omitempty"` // Access mode. Topic admins receive the full info, non-admins receive just the cumulative mode // Acs.Mode = want & given. The field is not a pointer because at least one value is always assigned. Acs MsgAccessMode `json:"acs,omitempty"` // ID of the message reported by the given user as read ReadSeqId int `json:"read,omitempty"` // ID of the message reported by the given user as received RecvSeqId int `json:"recv,omitempty"` // Topic's public data Public any `json:"public,omitempty"` // Topic's trusted public data Trusted any `json:"trusted,omitempty"` // User's own private data per topic Private any `json:"private,omitempty"` // Uid of the subscribed user User string `json:"user,omitempty"` // Topic name of this subscription Topic string `json:"topic,omitempty"` // Timestamp of the last message in the topic. TouchedAt *time.Time `json:"touched,omitempty"` // ID of the last {data} message in a topic SeqId int `json:"seq,omitempty"` // Id of the latest Delete operation DelId int `json:"clear,omitempty"` // Other user's last online timestamp & user agent LastSeen *MsgLastSeenInfo `json:"seen,omitempty"` }
MsgTopicSub is topic subscription details, sent in Meta message.
type Plugin ¶
type Plugin struct {
// contains filtered or unexported fields
}
Plugin defines client-side parameters of a gRPC plugin.
type PluginFilter ¶
type PluginFilter struct {
// contains filtered or unexported fields
}
PluginFilter is a enum which defines filtering types.
func ParsePluginFilter ¶
func ParsePluginFilter(s *string, filterBy int) (*PluginFilter, error)
ParsePluginFilter parses filter config string.
type ProxyReqType ¶
type ProxyReqType int
ProxyReqType is the type of proxy requests.
const ( ProxyReqNone ProxyReqType = iota ProxyReqJoin // {sub}. ProxyReqLeave // {leave} ProxyReqMeta // {meta set|get} ProxyReqBroadcast // {pub}, {note} ProxyReqBgSession ProxyReqMeUserAgent ProxyReqCall // Used in video call proxy sessions for routing call events. )
Individual request types.
type Server ¶
type Server struct{}
func NewServer ¶
func NewServer(sa ServerArgs) *Server
func (*Server) InitVideoCalls ¶
func (s *Server) InitVideoCalls(c *config.WebrtcConfig) error
type ServerArgs ¶
type ServerComMessage ¶
type ServerComMessage struct { Ctrl *MsgServerCtrl `json:"ctrl,omitempty"` Data *MsgServerData `json:"data,omitempty"` Meta *MsgServerMeta `json:"meta,omitempty"` Pres *MsgServerPres `json:"pres,omitempty"` Info *MsgServerInfo `json:"info,omitempty"` // MsgServerData has no Id field, copying it here for use in {ctrl} aknowledgements Id string `json:"-"` // Routable (expanded) name of the topic. RcptTo string `json:"-"` // User ID of the sender of the original message. AsUser string `json:"-"` // Timestamp for consistency of timestamps in {ctrl} messages // (corresponds to originating client message receipt timestamp). Timestamp time.Time `json:"-"` // Session ID to skip when sendng packet to sessions. Used to skip sending to original session. // Could be either empty. SkipSid string `json:"-"` // contains filtered or unexported fields }
ServerComMessage is a wrapper for server-side messages.
func ErrAPIKeyRequired ¶
func ErrAPIKeyRequired(ts time.Time) *ServerComMessage
ErrAPIKeyRequired valid API key is required (403).
func ErrAlreadyAuthenticated ¶
func ErrAlreadyAuthenticated(id, topic string, ts time.Time) *ServerComMessage
ErrAlreadyAuthenticated invalid attempt to authenticate an already authenticated session Switching users is not supported (409).
func ErrAlreadyExists ¶
func ErrAlreadyExists(id, topic string, ts time.Time) *ServerComMessage
ErrAlreadyExists the object already exists (409).
func ErrAttachFirst ¶
func ErrAttachFirst(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrAttachFirst must attach to topic first in response to a client message (409).
func ErrAuthFailed ¶
func ErrAuthFailed(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrAuthFailed authentication failed with explicit server and incoming request timestamps (401).
func ErrAuthRequired ¶
func ErrAuthRequired(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrAuthRequired authentication required - user must authenticate first (401).
func ErrAuthRequiredReply ¶
func ErrAuthRequiredReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrAuthRequiredReply authentication required - user must authenticate first in response to a client request (401).
func ErrAuthUnknownScheme ¶
func ErrAuthUnknownScheme(id, topic string, ts time.Time) *ServerComMessage
ErrAuthUnknownScheme authentication scheme is unrecognized or invalid (401).
func ErrCallBusyExplicitTs ¶
func ErrCallBusyExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrCallBusyExplicitTs indicates a "busy" reply to a video call request (486).
func ErrCallBusyReply ¶
func ErrCallBusyReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrCallBusyReply indicates a "busy" reply in response to a video call request (486)
func ErrClusterUnreachableExplicitTs ¶
func ErrClusterUnreachableExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrClusterUnreachable in-cluster communication has failed error with explicit server and incoming request timestamps (502).
func ErrClusterUnreachableReply ¶
func ErrClusterUnreachableReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrClusterUnreachableReply in-cluster communication has failed error as response to a client request (502).
func ErrCommandOutOfSequence ¶
func ErrCommandOutOfSequence(id, unused string, ts time.Time) *ServerComMessage
ErrCommandOutOfSequence invalid sequence of comments, i.e. attempt to {sub} before {hi} (409).
func ErrDuplicateCredential ¶
func ErrDuplicateCredential(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrDuplicateCredential attempt to create a duplicate credential with explicit server and incoming request timestamps (409).
func ErrGone ¶
func ErrGone(id, topic string, ts time.Time) *ServerComMessage
ErrGone topic deleted or user banned (410).
func ErrInvalidResponse ¶
func ErrInvalidResponse(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrInvalidResponse indicates that the client's response in invalid with explicit server and incoming request timestamps (406).
func ErrLocked ¶
func ErrLocked(id, topic string, ts time.Time) *ServerComMessage
ErrLocked operation rejected because the topic is being deleted (503).
func ErrLockedExplicitTs ¶
func ErrLockedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrLockedExplicitTs operation rejected because the topic is being deleted with explicit server and incoming request timestamps (503).
func ErrLockedReply ¶
func ErrLockedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrLockedReply operation rejected because the topic is being deleted in response to a client request (503).
func ErrMalformed ¶
func ErrMalformed(id, topic string, ts time.Time) *ServerComMessage
ErrMalformed request malformed (400).
func ErrMalformedExplicitTs ¶
func ErrMalformedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrMalformedExplicitTs request malformed with explicit server and incoming request timestamps (400).
func ErrMalformedReply ¶
func ErrMalformedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrMalformedReply request malformed in response to a client request (400).
func ErrNotFound ¶
func ErrNotFound(id, topic string, ts time.Time) *ServerComMessage
ErrNotFound is an error for missing objects other than user or topic (404).
func ErrNotFoundExplicitTs ¶
func ErrNotFoundExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrNotFoundExplicitTs is an error for missing objects other than user or topic with explicit server and incoming request timestamps (404).
func ErrNotFoundReply ¶
func ErrNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrNotFoundReply is an error for missing objects other than user or topic with explicit server and incoming request timestamps in response to a client request (404).
func ErrNotImplemented ¶
func ErrNotImplemented(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrNotImplemented feature not implemented with explicit server and incoming request timestamps (501). TODO: consider changing status code to 4XX.
func ErrNotImplementedReply ¶
func ErrNotImplementedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrNotImplementedReply feature not implemented error in response to a client request (501).
func ErrOperationNotAllowed ¶
func ErrOperationNotAllowed(id, topic string, ts time.Time) *ServerComMessage
ErrOperationNotAllowed a valid operation is not permitted in this context (405).
func ErrOperationNotAllowedExplicitTs ¶
func ErrOperationNotAllowedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrOperationNotAllowedExplicitTs a valid operation is not permitted in this context with explicit server and incoming request timestamps (405).
func ErrOperationNotAllowedReply ¶
func ErrOperationNotAllowedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrOperationNotAllowedReply a valid operation is not permitted in this context with explicit server and incoming request timestamps (405).
func ErrPermissionDenied ¶
func ErrPermissionDenied(id, topic string, ts time.Time) *ServerComMessage
ErrPermissionDenied user is authenticated but operation is not permitted (403).
func ErrPermissionDeniedExplicitTs ¶
func ErrPermissionDeniedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrPermissionDeniedExplicitTs user is authenticated but operation is not permitted with explicit server and incoming request timestamps (403).
func ErrPermissionDeniedReply ¶
func ErrPermissionDeniedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrPermissionDeniedReply user is authenticated but operation is not permitted with explicit server and incoming request timestamps in response to a client request (403).
func ErrPolicy ¶
func ErrPolicy(id, topic string, ts time.Time) *ServerComMessage
ErrPolicy request violates a policy (e.g. password is too weak or too many subscribers) (422).
func ErrPolicyExplicitTs ¶
func ErrPolicyExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrPolicyExplicitTs request violates a policy (e.g. password is too weak or too many subscribers) with explicit server and incoming request timestamps (422).
func ErrPolicyReply ¶
func ErrPolicyReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrPolicyReply request violates a policy (e.g. password is too weak or too many subscribers) with explicit server and incoming request timestamps in response to a client request (422).
func ErrServiceUnavailableExplicitTs ¶
func ErrServiceUnavailableExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrServiceUnavailableExplicitTs server overloaded error with explicit server and incoming request timestamps (503).
func ErrServiceUnavailableReply ¶
func ErrServiceUnavailableReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrServiceUnavailableReply server overloaded error in response to a client request (503).
func ErrSessionNotFound ¶
func ErrSessionNotFound(ts time.Time) *ServerComMessage
ErrSessionNotFound valid API key is required (403).
func ErrTooLarge ¶
func ErrTooLarge(id, topic string, ts time.Time) *ServerComMessage
ErrTooLarge packet or request size exceeded the limit (413).
func ErrTopicNotFound ¶
func ErrTopicNotFound(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrTopicNotFound topic is not found with explicit server and incoming request timestamps (404).
func ErrTopicNotFoundReply ¶
func ErrTopicNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrTopicNotFoundReply topic is not found with explicit server and incoming request timestamps in response to a client request (404).
func ErrUnknown ¶
func ErrUnknown(id, topic string, ts time.Time) *ServerComMessage
ErrUnknown database or other server error (500).
func ErrUnknownExplicitTs ¶
func ErrUnknownExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrUnknownExplicitTs database or other server error with explicit server and incoming request timestamps (500).
func ErrUnknownReply ¶
func ErrUnknownReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrUnknownReply database or other server error in response to a client request (500).
func ErrUserNotFound ¶
func ErrUserNotFound(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
ErrUserNotFound user is not found with explicit server and incoming request timestamps (404).
func ErrUserNotFoundReply ¶
func ErrUserNotFoundReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
ErrUserNotFoundReply user is not found with explicit server and incoming request timestamps in response to a client request (404).
func ErrVersionNotSupported ¶
func ErrVersionNotSupported(id string, ts time.Time) *ServerComMessage
ErrVersionNotSupported invalid (too low) protocol version (505).
func InfoAlreadySubscribed ¶
func InfoAlreadySubscribed(id, topic string, ts time.Time) *ServerComMessage
InfoAlreadySubscribed response means request to subscribe was ignored because user is already subscribed (304).
func InfoAuthReset ¶
func InfoAuthReset(id string, ts time.Time) *ServerComMessage
InfoAuthReset is sent in response to request to reset authentication when it was completed but login was not performed (301).
func InfoChallenge ¶
func InfoChallenge(id string, ts time.Time, challenge []byte) *ServerComMessage
InfoChallenge requires user to respond to presented challenge before login can be completed (300).
func InfoFound ¶
func InfoFound(id, topic string, ts time.Time) *ServerComMessage
InfoFound redirects to a new resource (307).
func InfoNoAction ¶
func InfoNoAction(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
InfoNoAction response means request was ignored because the object was already in the desired state with explicit server and incoming request timestamps (304).
func InfoNoActionReply ¶
func InfoNoActionReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
InfoNoActionReply response means request was ignored because the object was already in the desired state in response to a client request (304).
func InfoNotJoined ¶
func InfoNotJoined(id, topic string, ts time.Time) *ServerComMessage
InfoNotJoined response means request to leave was ignored because user was not subscribed (304).
func InfoNotModified ¶
func InfoNotModified(id, topic string, ts time.Time) *ServerComMessage
InfoNotModified response means update request was a noop (304).
func InfoNotModifiedExplicitTs ¶
func InfoNotModifiedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
InfoNotModifiedExplicitTs response means update request was a noop with explicit server and incoming request timestamps (304).
func InfoNotModifiedReply ¶
func InfoNotModifiedReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
InfoNotModifiedReply response means update request was a noop in response to a client request (304).
func InfoUseOther ¶
func InfoUseOther(id, topic, other string, serverTs, incomingReqTs time.Time) *ServerComMessage
InfoUseOther is a response to a subscription request redirecting client to another topic (303).
func InfoUseOtherReply ¶
func InfoUseOtherReply(msg *ClientComMessage, other string, ts time.Time) *ServerComMessage
InfoUseOtherReply is a response to a subscription request redirecting client to another topic (303).
func InfoValidateCredentials ¶
func InfoValidateCredentials(id string, ts time.Time) *ServerComMessage
InfoValidateCredentials requires user to confirm credentials before going forward (300).
func InfoValidateCredentialsExplicitTs ¶
func InfoValidateCredentialsExplicitTs(id string, serverTs, incomingReqTs time.Time) *ServerComMessage
InfoValidateCredentialsExplicitTs requires user to confirm credentials before going forward with explicit server and incoming request timestamps (300).
func NoContentParams ¶
func NoContentParams(id, topic string, serverTs, incomingReqTs time.Time, params any) *ServerComMessage
NoContentParams indicates request was processed but resulted in no content (204).
func NoContentParamsReply ¶
func NoContentParamsReply(msg *ClientComMessage, ts time.Time, params any) *ServerComMessage
NoContentParamsReply indicates request was processed but resulted in no content in response to a client request (204).
func NoErr ¶
func NoErr(id, topic string, ts time.Time) *ServerComMessage
NoErr indicates successful completion (200).
func NoErrAccepted ¶
func NoErrAccepted(id, topic string, ts time.Time) *ServerComMessage
NoErrAccepted indicates request was accepted but not processed yet (202).
func NoErrAcceptedExplicitTs ¶
func NoErrAcceptedExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
NoErrAcceptedExplicitTs indicates request was accepted but not processed yet with explicit server and incoming request timestamps (202).
func NoErrCreated ¶
func NoErrCreated(id, topic string, ts time.Time) *ServerComMessage
NoErrCreated indicated successful creation of an object (201).
func NoErrDeliveredParams ¶
func NoErrDeliveredParams(id, topic string, ts time.Time, params any) *ServerComMessage
NoErrDeliveredParams means requested content has been delivered (208).
func NoErrEvicted ¶
func NoErrEvicted(id, topic string, ts time.Time) *ServerComMessage
NoErrEvicted indicates that the user was disconnected from topic for no fault of the user (205).
func NoErrExplicitTs ¶
func NoErrExplicitTs(id, topic string, serverTs, incomingReqTs time.Time) *ServerComMessage
NoErrExplicitTs indicates successful completion with explicit server and incoming request timestamps (200).
func NoErrParams ¶
func NoErrParams(id, topic string, ts time.Time, params any) *ServerComMessage
NoErrParams indicates successful completion with additional parameters (200).
func NoErrParamsExplicitTs ¶
func NoErrParamsExplicitTs(id, topic string, serverTs, incomingReqTs time.Time, params any) *ServerComMessage
NoErrParamsExplicitTs indicates successful completion with additional parameters and explicit server and incoming request timestamps (200).
func NoErrParamsReply ¶
func NoErrParamsReply(msg *ClientComMessage, ts time.Time, params any) *ServerComMessage
NoErrParamsReply indicates successful completion with additional parameters and explicit server and incoming request timestamps (200).
func NoErrReply ¶
func NoErrReply(msg *ClientComMessage, ts time.Time) *ServerComMessage
NoErrReply indicates successful completion as a reply to a client message (200).
func NoErrShutdown ¶
func NoErrShutdown(ts time.Time) *ServerComMessage
NoErrShutdown means user was disconnected from topic because system shutdown is in progress (205).
func (*ServerComMessage) Describe ¶
func (src *ServerComMessage) Describe() string
type SessionArgs ¶
type SessionProto ¶
type SessionProto int
SessionProto is the type of the wire transport.
const ( // NONE is undefined/not set. NONE SessionProto = iota // WEBSOCK represents websocket connection. WEBSOCK // LPOLL represents a long polling connection. LPOLL // GRPC is a gRPC connection GRPC // PROXY is temporary session used as a proxy at master node. PROXY // MULTIPLEX is a multiplexing session representing a connection from proxy topic to master. MULTIPLEX )
Constants defining individual types of wire transports.
type SessionStore ¶
type SessionStore struct {
// contains filtered or unexported fields
}
SessionStore holds live sessions. Long polling sessions are stored in a linked list with
most recent sessions on top. In addition all sessions are stored in a map indexed by session ID.
func NewSessionStore ¶
func NewSessionStore(sa SessionArgs) *SessionStore
NewSessionStore initializes a session store.
func (*SessionStore) Delete ¶
func (ss *SessionStore) Delete(s *Session)
Delete removes session from store.
func (*SessionStore) EvictUser ¶
func (ss *SessionStore) EvictUser(uid types.Uid, skipSid string)
EvictUser terminates all sessions of a given user.
func (*SessionStore) Get ¶
func (ss *SessionStore) Get(sid string) *Session
Get fetches a session from store by session ID.
func (*SessionStore) NewSession ¶
func (ss *SessionStore) NewSession(conn any, sid string) (*Session, int)
NewSession creates a new session and saves it to the session store.
func (*SessionStore) NodeRestarted ¶
func (ss *SessionStore) NodeRestarted(nodeName string, fingerprint int64)
NodeRestarted removes stale sessions from a restarted cluster node.
- nodeName is the name of affected node
- fingerprint is the new fingerprint of the node.
func (*SessionStore) Range ¶
func (ss *SessionStore) Range(f func(sid string, s *Session) bool)
Range calls given function for all sessions. It stops if the function returns false.
func (*SessionStore) Shutdown ¶
func (ss *SessionStore) Shutdown()
Shutdown terminates sessionStore. No need to clean up. Don't send to clustered sessions, their servers are not being shut down.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is a mapper of sessions to topics.
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic is an isolated communication channel
type UserCacheReq ¶
type UserCacheReq struct { // Name of the node sending this request in case of cluster. Not set otherwise. Node string // UserId is set when count of unread messages is updated for a single user or // when the user is being deleted. UserId types.Uid // UserIdList is set when subscription count is updated for users of a topic. UserIdList []types.Uid // Unread count (UserId is set) Unread int // In case of set UserId: treat Unread count as an increment as opposite to the final value. // In case of set UserIdList: intement (Inc == true) or decrement subscription count by one. Inc bool // User is being deleted, remove user from cache. Gone bool // Optional push notification PushRcpt *pt.Receipt }
UserCacheReq contains data which mutates one or more user cache entries.
Source Files ¶
- bounded_wait_group.go
- call_video.go
- call_video_types.go
- cluster.go
- cluster_init.go
- cluster_leader.go
- cluster_node.go
- cluster_types.go
- datamodel_funcs.go
- datamodel_types.go
- hdl_files.go
- hdl_grpc.go
- hdl_longpool.go
- hdl_websock.go
- http.go
- http_types.go
- hub.go
- hub_types.go
- pbconverter.go
- plugins.go
- plugins_helpers.go
- plugins_types.go
- pres.go
- pres_types.go
- push.go
- server.go
- session.go
- session_clean.go
- session_helpers.go
- session_store.go
- session_types.go
- topic.go
- topic_helpers.go
- topic_init.go
- topic_proxy.go
- topic_types.go
- users.go
- users_types.go
- utils.go