Documentation ¶
Overview ¶
Package centrifuge is a real-time messaging library that abstracts several bidirectional transports (Websocket and its emulation over HTTP-Streaming, SSE/EventSource) and provides primitives to build scalable real-time applications with Go. It's also used as a core of Centrifugo server (https://github.com/centrifugal/centrifugo).
Centrifuge library provides several features on top of plain Websocket implementation and comes with several client SDKs – see more details in the library README on GitHub – https://github.com/centrifugal/centrifuge.
The API of this library is almost all goroutine-safe except cases where one-time operations like setting callback handlers performed, also your code inside event handlers should be synchronized since event handlers can be called concurrently. Library expects that code inside event handlers will not block. See more information about client connection lifetime and event handler order/concurrency in README on GitHub.
Also check out examples in repo to see main library concepts in action.
Index ¶
- Constants
- Variables
- func HandleReadFrame(c *Client, r io.Reader) bool
- func LogLevelToString(l LogLevel) string
- func NewClient(ctx context.Context, n *Node, t Transport) (*Client, ClientCloseFunc, error)
- func SetCredentials(ctx context.Context, cred *Credentials) context.Context
- type AliveHandler
- type Broker
- type BrokerEventHandler
- type CacheEmptyEvent
- type CacheEmptyHandler
- type CacheEmptyReply
- type ChannelContext
- type ChannelMediumOptions
- type Client
- func (c *Client) AcquireStorage() (map[string]any, func(map[string]any))
- func (c *Client) Channels() []string
- func (c *Client) ChannelsWithContext() map[string]ChannelContext
- func (c *Client) Connect(req ConnectRequest)
- func (c *Client) ConnectNoErrorToDisconnect(req ConnectRequest) error
- func (c *Client) Context() context.Context
- func (c *Client) Disconnect(disconnect ...Disconnect)
- func (c *Client) HandleCommand(cmd *protocol.Command, cmdProtocolSize int) bool
- func (c *Client) ID() string
- func (c *Client) Info() []byte
- func (c *Client) IsSubscribed(ch string) bool
- func (c *Client) OnAlive(h AliveHandler)
- func (c *Client) OnDisconnect(h DisconnectHandler)
- func (c *Client) OnHistory(h HistoryHandler)
- func (c *Client) OnMessage(h MessageHandler)
- func (c *Client) OnPresence(h PresenceHandler)
- func (c *Client) OnPresenceStats(h PresenceStatsHandler)
- func (c *Client) OnPublish(h PublishHandler)
- func (c *Client) OnRPC(h RPCHandler)
- func (c *Client) OnRefresh(h RefreshHandler)
- func (c *Client) OnStateSnapshot(h StateSnapshotHandler)
- func (c *Client) OnSubRefresh(h SubRefreshHandler)
- func (c *Client) OnSubscribe(h SubscribeHandler)
- func (c *Client) OnUnsubscribe(h UnsubscribeHandler)
- func (c *Client) Refresh(opts ...RefreshOption) error
- func (c *Client) Send(data []byte) error
- func (c *Client) StateSnapshot() (any, error)
- func (c *Client) Subscribe(channel string, opts ...SubscribeOption) error
- func (c *Client) Transport() TransportInfo
- func (c *Client) Unsubscribe(ch string, unsubscribe ...Unsubscribe)
- func (c *Client) UserID() string
- func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error
- type ClientCloseFunc
- type ClientInfo
- type Closer
- type CommandProcessedEvent
- type CommandProcessedHandler
- type CommandReadEvent
- type CommandReadHandler
- type Config
- type ConnectEvent
- type ConnectHandler
- type ConnectReply
- type ConnectRequest
- type ConnectingHandler
- type Credentials
- type DeltaType
- type Disconnect
- type DisconnectEvent
- type DisconnectHandler
- type DisconnectOption
- type DisconnectOptions
- type EmulationConfig
- type EmulationHandler
- type Error
- type HTTPStreamConfig
- type HTTPStreamHandler
- type HistoryCallback
- type HistoryEvent
- type HistoryFilter
- type HistoryHandler
- type HistoryOption
- type HistoryOptions
- type HistoryReply
- type HistoryResult
- type Hub
- func (h *Hub) BroadcastPublication(ch string, pub *Publication, sp StreamPosition) error
- func (h *Hub) Channels() []string
- func (h *Hub) Connections() map[string]*Client
- func (h *Hub) NumChannels() int
- func (h *Hub) NumClients() int
- func (h *Hub) NumSubscribers(ch string) int
- func (h *Hub) NumSubscriptions() int
- func (h *Hub) NumUsers() int
- func (h *Hub) UserConnections(userID string) map[string]*Client
- type Info
- type LogEntry
- type LogHandler
- type LogLevel
- type MemoryBroker
- func (b *MemoryBroker) Close(_ context.Context) error
- func (b *MemoryBroker) History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error)
- func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error)
- func (b *MemoryBroker) PublishControl(data []byte, _, _ string) error
- func (b *MemoryBroker) PublishJoin(ch string, info *ClientInfo) error
- func (b *MemoryBroker) PublishLeave(ch string, info *ClientInfo) error
- func (b *MemoryBroker) RemoveHistory(ch string) error
- func (b *MemoryBroker) Run(h BrokerEventHandler) error
- func (b *MemoryBroker) Subscribe(_ string) error
- func (b *MemoryBroker) Unsubscribe(_ string) error
- type MemoryBrokerConfig
- type MemoryPresenceManager
- func (m *MemoryPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error
- func (m *MemoryPresenceManager) Close(_ context.Context) error
- func (m *MemoryPresenceManager) Presence(ch string) (map[string]*ClientInfo, error)
- func (m *MemoryPresenceManager) PresenceStats(ch string) (PresenceStats, error)
- func (m *MemoryPresenceManager) RemovePresence(ch string, clientID string, _ string) error
- type MemoryPresenceManagerConfig
- type MessageEvent
- type MessageHandler
- type Metrics
- type MetricsConfig
- type Node
- func (n *Node) Config() Config
- func (n *Node) Disconnect(userID string, opts ...DisconnectOption) error
- func (n *Node) HandleControl(data []byte) error
- func (n *Node) HandleJoin(ch string, info *ClientInfo) error
- func (n *Node) HandleLeave(ch string, info *ClientInfo) error
- func (n *Node) HandlePublication(ch string, pub *Publication, sp StreamPosition, delta bool, ...) error
- func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error)
- func (n *Node) Hub() *Hub
- func (n *Node) ID() string
- func (n *Node) Info() (Info, error)
- func (n *Node) Notify(op string, data []byte, toNodeID string) error
- func (n *Node) NotifyShutdown() chan struct{}
- func (n *Node) OnCacheEmpty(h CacheEmptyHandler)
- func (n *Node) OnCommandProcessed(handler CommandProcessedHandler)
- func (n *Node) OnCommandRead(handler CommandReadHandler)
- func (n *Node) OnConnect(handler ConnectHandler)
- func (n *Node) OnConnecting(handler ConnectingHandler)
- func (n *Node) OnNodeInfoSend(handler NodeInfoSendHandler)
- func (n *Node) OnNotification(handler NotificationHandler)
- func (n *Node) OnSurvey(handler SurveyHandler)
- func (n *Node) OnTransportWrite(handler TransportWriteHandler)
- func (n *Node) Presence(ch string) (PresenceResult, error)
- func (n *Node) PresenceStats(ch string) (PresenceStatsResult, error)
- func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (PublishResult, error)
- func (n *Node) Refresh(userID string, opts ...RefreshOption) error
- func (n *Node) RemoveHistory(ch string) error
- func (n *Node) Run() error
- func (n *Node) SetBroker(b Broker)
- func (n *Node) SetPresenceManager(m PresenceManager)
- func (n *Node) Shutdown(ctx context.Context) error
- func (n *Node) Subscribe(userID string, channel string, opts ...SubscribeOption) error
- func (n *Node) Survey(ctx context.Context, op string, data []byte, toNodeID string) (map[string]SurveyResult, error)
- func (n *Node) Unsubscribe(userID string, channel string, opts ...UnsubscribeOption) error
- type NodeInfo
- type NodeInfoSendHandler
- type NodeInfoSendReply
- type NotificationEvent
- type NotificationHandler
- type PingPongConfig
- type PresenceCallback
- type PresenceEvent
- type PresenceHandler
- type PresenceManager
- type PresenceReply
- type PresenceResult
- type PresenceStats
- type PresenceStatsCallback
- type PresenceStatsEvent
- type PresenceStatsHandler
- type PresenceStatsReply
- type PresenceStatsResult
- type ProtocolType
- type ProtocolVersion
- type Publication
- type PublishCallback
- type PublishEvent
- type PublishHandler
- type PublishOption
- func WithClientInfo(info *ClientInfo) PublishOption
- func WithDelta(enabled bool) PublishOption
- func WithHistory(size int, ttl time.Duration, metaTTL ...time.Duration) PublishOption
- func WithIdempotencyKey(key string) PublishOption
- func WithIdempotentResultTTL(ttl time.Duration) PublishOption
- func WithTags(meta map[string]string) PublishOption
- type PublishOptions
- type PublishReply
- type PublishResult
- type RPCCallback
- type RPCEvent
- type RPCHandler
- type RPCReply
- type RecoveryMode
- type RedisBroker
- func (b *RedisBroker) Close(_ context.Context) error
- func (b *RedisBroker) History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error)
- func (b *RedisBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error)
- func (b *RedisBroker) PublishControl(data []byte, nodeID string, _ string) error
- func (b *RedisBroker) PublishJoin(ch string, info *ClientInfo) error
- func (b *RedisBroker) PublishLeave(ch string, info *ClientInfo) error
- func (b *RedisBroker) RemoveHistory(ch string) error
- func (b *RedisBroker) Run(h BrokerEventHandler) error
- func (b *RedisBroker) Subscribe(ch string) error
- func (b *RedisBroker) Unsubscribe(ch string) error
- type RedisBrokerConfig
- type RedisPresenceManager
- func (m *RedisPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error
- func (m *RedisPresenceManager) Close(_ context.Context) error
- func (m *RedisPresenceManager) Presence(ch string) (map[string]*ClientInfo, error)
- func (m *RedisPresenceManager) PresenceStats(ch string) (PresenceStats, error)
- func (m *RedisPresenceManager) RemovePresence(ch string, clientID string, userID string) error
- type RedisPresenceManagerConfig
- type RedisShard
- type RedisShardConfig
- type RefreshCallback
- type RefreshEvent
- type RefreshHandler
- type RefreshOption
- type RefreshOptions
- type RefreshReply
- type RegistererGatherer
- type SSEConfig
- type SSEHandler
- type StateSnapshotHandler
- type StreamPosition
- type SubRefreshCallback
- type SubRefreshEvent
- type SubRefreshHandler
- type SubRefreshReply
- type SubscribeCallback
- type SubscribeEvent
- type SubscribeHandler
- type SubscribeOption
- func WithChannelInfo(chanInfo []byte) SubscribeOption
- func WithEmitJoinLeave(enabled bool) SubscribeOption
- func WithEmitPresence(enabled bool) SubscribeOption
- func WithExpireAt(expireAt int64) SubscribeOption
- func WithPositioning(enabled bool) SubscribeOption
- func WithPushJoinLeave(enabled bool) SubscribeOption
- func WithRecoverSince(since *StreamPosition) SubscribeOption
- func WithRecovery(enabled bool) SubscribeOption
- func WithRecoveryMode(mode RecoveryMode) SubscribeOption
- func WithSubscribeClient(clientID string) SubscribeOption
- func WithSubscribeData(data []byte) SubscribeOption
- func WithSubscribeHistoryMetaTTL(metaTTL time.Duration) SubscribeOption
- func WithSubscribeSession(sessionID string) SubscribeOption
- func WithSubscribeSource(source uint8) SubscribeOption
- type SubscribeOptions
- type SubscribeReply
- type SubscribeRequest
- type SurveyCallback
- type SurveyEvent
- type SurveyHandler
- type SurveyReply
- type SurveyResult
- type Transport
- type TransportInfo
- type TransportWriteEvent
- type TransportWriteHandler
- type Unsubscribe
- type UnsubscribeEvent
- type UnsubscribeHandler
- type UnsubscribeOption
- type UnsubscribeOptions
- type WebsocketConfig
- type WebsocketHandler
Constants ¶
const ( PushFlagConnect uint64 = 1 << iota PushFlagDisconnect PushFlagSubscribe PushFlagJoin PushFlagLeave PushFlagUnsubscribe PushFlagPublication PushFlagMessage )
It's possible to disable certain types of pushes to be sent to a client connection by using ClientConfig.DisabledPushFlags.
const ( // UnsubscribeCodeClient set when unsubscribe event was initiated // by an explicit client-side unsubscribe call. // Code is less than 2000 since it's never sent to a client connection. UnsubscribeCodeClient uint32 = 0 // UnsubscribeCodeDisconnect set when unsubscribe event was initiated // by a client disconnect process. // Code is less than 2000 since it's never sent to a client connection. UnsubscribeCodeDisconnect uint32 = 1 // UnsubscribeCodeServer set when unsubscribe event was initiated // by an explicit server-side unsubscribe call. UnsubscribeCodeServer uint32 = 2000 // UnsubscribeCodeInsufficient set when client unsubscribed from // a channel due to insufficient state in a stream. We expect client to // resubscribe after receiving this since it's still may be possible to // recover a state since known StreamPosition. UnsubscribeCodeInsufficient uint32 = 2500 // UnsubscribeCodeExpired set when client subscription expired. UnsubscribeCodeExpired uint32 = 2501 )
Known unsubscribe codes.
const NoLimit = -1
NoLimit defines that limit should not be applied.
Variables ¶
var ( // DisconnectShutdown issued when node is going to shut down. DisconnectShutdown = Disconnect{ Code: 3001, Reason: "shutdown", } // DisconnectServerError issued when internal error occurred on server. DisconnectServerError = Disconnect{ Code: 3004, Reason: "internal server error", } // DisconnectExpired issued when client connection expired. DisconnectExpired = Disconnect{ Code: 3005, Reason: "connection expired", } // DisconnectSubExpired issued when client subscription expired. DisconnectSubExpired = Disconnect{ Code: 3006, Reason: "subscription expired", } // DisconnectSlow issued when client can't read messages fast enough. DisconnectSlow = Disconnect{ Code: 3008, Reason: "slow", } // DisconnectWriteError issued when an error occurred while writing to // client connection. DisconnectWriteError = Disconnect{ Code: 3009, Reason: "write error", } // DisconnectInsufficientState issued when server detects wrong client // position in channel Publication stream. Disconnect allows client // to restore missed publications on reconnect. DisconnectInsufficientState = Disconnect{ Code: 3010, Reason: "insufficient state", } // DisconnectForceReconnect issued when server disconnects connection. DisconnectForceReconnect = Disconnect{ Code: 3011, Reason: "force reconnect", } // DisconnectNoPong may be issued when server disconnects bidirectional // connection due to no pong received to application-level server-to-client // pings in a configured time. DisconnectNoPong = Disconnect{ Code: 3012, Reason: "no pong", } // DisconnectTooManyRequests may be issued when client sends too many commands to a server. DisconnectTooManyRequests = Disconnect{ Code: 3013, Reason: "too many requests", } )
Some predefined non-terminal disconnect structures used by the library internally.
var ( // DisconnectInvalidToken issued when client came with invalid token. DisconnectInvalidToken = Disconnect{ Code: 3500, Reason: "invalid token", } // DisconnectBadRequest issued when client uses malformed protocol frames. DisconnectBadRequest = Disconnect{ Code: 3501, Reason: "bad request", } // DisconnectStale issued to close connection that did not become // authenticated in configured interval after dialing. DisconnectStale = Disconnect{ Code: 3502, Reason: "stale", } // DisconnectForceNoReconnect issued when server disconnects connection // and asks it to not reconnect again. DisconnectForceNoReconnect = Disconnect{ Code: 3503, Reason: "force disconnect", } // DisconnectConnectionLimit can be issued when client connection exceeds a // configured connection limit (per user ID or due to other rule). DisconnectConnectionLimit = Disconnect{ Code: 3504, Reason: "connection limit", } // DisconnectChannelLimit can be issued when client connection exceeds a // configured channel limit. DisconnectChannelLimit = Disconnect{ Code: 3505, Reason: "channel limit", } // DisconnectInappropriateProtocol can be issued when client connection format can not // handle incoming data. For example, this happens when JSON-based clients receive // binary data in a channel. This is usually an indicator of programmer error, JSON // clients can not handle binary. DisconnectInappropriateProtocol = Disconnect{ Code: 3506, Reason: "inappropriate protocol", } // DisconnectPermissionDenied may be issued when client attempts accessing a server without // enough permissions. DisconnectPermissionDenied = Disconnect{ Code: 3507, Reason: "permission denied", } // DisconnectNotAvailable may be issued when ErrorNotAvailable does not fit message type, for example // we issue DisconnectNotAvailable when client sends asynchronous message without MessageHandler set // on server side. DisconnectNotAvailable = Disconnect{ Code: 3508, Reason: "not available", } // DisconnectTooManyErrors may be issued when client generates too many errors. DisconnectTooManyErrors = Disconnect{ Code: 3509, Reason: "too many errors", } )
The codes below are built-in terminal codes.
var ( // ErrorInternal means server error, if returned this is a signal // that something went wrong with server itself and client most probably // not guilty. ErrorInternal = &Error{ Code: 100, Message: "internal server error", Temporary: true, } ErrorUnauthorized = &Error{ Code: 101, Message: "unauthorized", } // ErrorUnknownChannel means that channel name does not exist. ErrorUnknownChannel = &Error{ Code: 102, Message: "unknown channel", } // ErrorPermissionDenied means that access to resource not allowed. ErrorPermissionDenied = &Error{ Code: 103, Message: "permission denied", } // ErrorMethodNotFound means that method sent in command does not exist. ErrorMethodNotFound = &Error{ Code: 104, Message: "method not found", } // ErrorAlreadySubscribed returned when client wants to subscribe on channel // it already subscribed to. ErrorAlreadySubscribed = &Error{ Code: 105, Message: "already subscribed", } // ErrorLimitExceeded says that some sort of limit exceeded, server logs should // give more detailed information. See also ErrorTooManyRequests which is more // specific for rate limiting purposes. ErrorLimitExceeded = &Error{ Code: 106, Message: "limit exceeded", } // ErrorBadRequest says that server can not process received // data because it is malformed. Retrying request does not make sense. ErrorBadRequest = &Error{ Code: 107, Message: "bad request", } // ErrorNotAvailable means that resource is not enabled. ErrorNotAvailable = &Error{ Code: 108, Message: "not available", } // ErrorTokenExpired indicates that connection or subscription token expired. // In this case client should drop the current token and try to ask a new one. ErrorTokenExpired = &Error{ Code: 109, Message: "token expired", } // ErrorExpired indicates that connection expired (no token involved). ErrorExpired = &Error{ Code: 110, Message: "expired", } // ErrorTooManyRequests means that server rejected request due to // its rate limiting strategies. ErrorTooManyRequests = &Error{ Code: 111, Message: "too many requests", Temporary: true, } // ErrorUnrecoverablePosition means that stream does not contain required // range of publications to fulfill a history query. This can happen due to // expiration, size limitation or due to wrong epoch. ErrorUnrecoverablePosition = &Error{ Code: 112, Message: "unrecoverable position", } )
Here we define well-known errors that can be used in client protocol replies.
var DisconnectConnectionClosed = Disconnect{
Code: 3000,
Reason: "connection closed",
}
DisconnectConnectionClosed is a special Disconnect object used when client connection was closed without any advice from a server side. This can be a clean disconnect, or temporary disconnect of the client due to internet connection loss. Server can not distinguish the actual reason of disconnect.
Functions ¶
func HandleReadFrame ¶ added in v0.28.0
HandleReadFrame is a helper to read Centrifuge commands from frame-based io.Reader and process them. Frame-based means that EOF treated as the end of the frame, not the entire connection close.
func LogLevelToString ¶
LogLevelToString transforms Level to its string representation.
func SetCredentials ¶
func SetCredentials(ctx context.Context, cred *Credentials) context.Context
SetCredentials allows setting connection Credentials to Context. Credentials set to Context in authentication middleware will be used by Centrifuge library to authenticate user.
Types ¶
type AliveHandler ¶ added in v0.10.0
type AliveHandler func()
AliveHandler called periodically while connection alive. This is a helper to do periodic things which can tolerate some approximation in time. This callback will run every ClientPresenceUpdateInterval and can save you a timer.
type Broker ¶
type Broker interface { // Run called once on start when broker already set to node. At // this moment node is ready to process broker events. Run(BrokerEventHandler) error // Subscribe node on channel to listen all messages coming from channel. Subscribe(ch string) error // Unsubscribe node from channel to stop listening messages from it. Unsubscribe(ch string) error // Publish allows sending data into channel. Data should be // delivered to all clients subscribed to this channel at moment on any // Centrifuge node (with at most once delivery guarantee). // // Broker can optionally maintain publication history inside channel according // to PublishOptions provided. See History method for rules that should be implemented // for accessing publications from history stream. // // Saving message to a history stream and publish to PUB/SUB should be an atomic // operation per channel. If this is not true – then publication to one channel // must be serialized on the caller side, i.e. publish requests must be issued one // after another. Otherwise, the order of publications and stable behaviour of // subscribers with positioning/recovery enabled can't be guaranteed. // // StreamPosition returned here describes stream epoch and offset assigned to // the publication. For channels without history this StreamPosition should be // zero value. // Second bool value returned here means whether Publish was suppressed due to // the use of PublishOptions.IdempotencyKey. In this case StreamPosition is // returned from the cache maintained by Broker. Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error) // PublishJoin publishes Join Push message into channel. PublishJoin(ch string, info *ClientInfo) error // PublishLeave publishes Leave Push message into channel. PublishLeave(ch string, info *ClientInfo) error // PublishControl allows sending control command data. If nodeID is empty string // then message should be delivered to all running nodes, if nodeID is set then // message should be delivered only to node with specified ID. PublishControl(data []byte, nodeID, shardKey string) error // History used to extract Publications from history stream. // Publications returned according to HistoryFilter which allows to set several // filtering options. StreamPosition returned describes current history stream // top offset and epoch. History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error) // RemoveHistory removes history from channel. This is in general not // needed as history expires automatically (based on history_lifetime) // but sometimes can be useful for application logic. RemoveHistory(ch string) error }
Broker is responsible for PUB/SUB mechanics.
type BrokerEventHandler ¶
type BrokerEventHandler interface { // HandlePublication to handle received Publications. HandlePublication(ch string, pub *Publication, sp StreamPosition, useDelta bool, prevPub *Publication) error // HandleJoin to handle received Join messages. HandleJoin(ch string, info *ClientInfo) error // HandleLeave to handle received Leave messages. HandleLeave(ch string, info *ClientInfo) error // HandleControl to handle received control data. HandleControl(data []byte) error }
BrokerEventHandler can handle messages received from PUB/SUB system.
type CacheEmptyEvent ¶ added in v0.33.0
type CacheEmptyEvent struct {
Channel string
}
CacheEmptyEvent is issued when recovery mode is used but Centrifuge can't find Publication in history to recover from. This event allows application to decide what to do in this case – it's possible to populate the cache by sending actual data to a channel.
type CacheEmptyHandler ¶ added in v0.33.0
type CacheEmptyHandler func(CacheEmptyEvent) (CacheEmptyReply, error)
CacheEmptyHandler allows setting cache empty handler function.
type CacheEmptyReply ¶ added in v0.33.0
type CacheEmptyReply struct { // Populated when set to true tells Centrifuge that cache was populated and // in that case Centrifuge will try to recover missed Publication from history // one more time. Populated bool }
CacheEmptyReply contains fields determining the reaction on cache empty event.
type ChannelContext ¶
type ChannelContext struct { Source uint8 // contains filtered or unexported fields }
ChannelContext contains extra context for channel connection subscribed to. Note: this struct is aligned to consume less memory.
type ChannelMediumOptions ¶ added in v0.33.0
type ChannelMediumOptions struct { // KeepLatestPublication enables keeping latest publication which was broadcasted to channel subscribers on // this Node in the channel medium layer. This is helpful for supporting deltas in at most once scenario. KeepLatestPublication bool // check is only performed no more often than once in Config.ClientChannelPositionCheckDelay thus reducing // the load on broker in cases when channel has many subscribers. When message loss is detected medium layer // tells caller about this and also marks all channel subscribers with insufficient state flag. By default, // medium is not used for sync – in that case each individual connection syncs position independently. SharedPositionSync bool // contains filtered or unexported fields }
ChannelMediumOptions is an EXPERIMENTAL way to enable using a channel medium layer in Centrifuge. Note, channel medium layer is very unstable at the moment – do not use it in production! Channel medium layer is an optional per-channel intermediary between Broker PUB/SUB and Client connections. This intermediary layer may be used for various per-channel tweaks and optimizations. Channel medium comes with memory overhead depending on ChannelMediumOptions. At the same time, it can provide significant benefits in terms of overall system efficiency and flexibility.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents client connection to server.
func (*Client) AcquireStorage ¶ added in v0.29.2
AcquireStorage returns an attached connection storage (a map) and a function to be called when the application finished working with the storage map. Be accurate when using this API – avoid acquiring storage for a long time - i.e. on the time of IO operations. Do the work fast and release with the updated map. The API designed this way to allow reading, modifying or fully overriding storage map and avoid making deep copies each time. Note, that if storage map has not been initialized yet - i.e. if it's nil - then it will be initialized to an empty map and then returned – so you never receive nil map when acquiring. The purpose of this map is to simplify handling user-defined state during the lifetime of connection. Try to keep this map reasonably small. This API is EXPERIMENTAL and may be changed/removed.
func (*Client) Channels ¶
Channels returns a slice of channels client connection currently subscribed to.
func (*Client) ChannelsWithContext ¶ added in v0.26.0
func (c *Client) ChannelsWithContext() map[string]ChannelContext
ChannelsWithContext returns a map of channels client connection currently subscribed to with a ChannelContext.
func (*Client) Connect ¶ added in v0.16.0
func (c *Client) Connect(req ConnectRequest)
Connect supposed to be called only from a unidirectional transport layer to pass initial information about connection and thus initiate Node.OnConnecting event. Bidirectional transport initiate connecting workflow automatically since client passes Connect command upon successful connection establishment with a server. If there is an error during connect method processing Centrifuge extracts Disconnect from it and closes the connection with that Disconnect message.
func (*Client) ConnectNoErrorToDisconnect ¶ added in v0.33.5
func (c *Client) ConnectNoErrorToDisconnect(req ConnectRequest) error
ConnectNoErrorToDisconnect is the same as Client.Connect but does not try to extract Disconnect code from the error returned by the connect logic, instead it just returns the error to the caller. This error must be handled by the caller on the Transport level, and the connection must be closed on Transport level upon receiving an error.
func (*Client) Context ¶ added in v0.10.0
Context returns client Context. This context will be canceled as soon as client connection closes.
func (*Client) Disconnect ¶ added in v0.10.0
func (c *Client) Disconnect(disconnect ...Disconnect)
Disconnect client connection with specific disconnect code and reason. If zero args or nil passed then DisconnectForceNoReconnect is used.
This method internally creates a new goroutine at the moment to do closing stuff. An extra goroutine is required to solve disconnect and alive callback ordering/sync problems. Will be a noop if client already closed. As this method runs a separate goroutine client connection will be closed eventually (i.e. not immediately).
func (*Client) HandleCommand ¶ added in v0.24.0
HandleCommand processes a single protocol.Command. Supposed to be called only from a transport connection reader.
func (*Client) IsSubscribed ¶ added in v0.11.0
IsSubscribed returns true if client subscribed to a channel.
func (*Client) OnAlive ¶ added in v0.13.0
func (c *Client) OnAlive(h AliveHandler)
OnAlive allows setting AliveHandler. AliveHandler called periodically for active client connection.
func (*Client) OnDisconnect ¶ added in v0.13.0
func (c *Client) OnDisconnect(h DisconnectHandler)
OnDisconnect allows setting DisconnectHandler. DisconnectHandler called when client disconnected.
func (*Client) OnHistory ¶ added in v0.13.0
func (c *Client) OnHistory(h HistoryHandler)
OnHistory allows settings HistoryHandler. HistoryHandler called when History request from client received. At this moment you can only return a custom error or disconnect client.
func (*Client) OnMessage ¶ added in v0.13.0
func (c *Client) OnMessage(h MessageHandler)
OnMessage allows setting MessageHandler. MessageHandler called when client sent asynchronous message.
func (*Client) OnPresence ¶ added in v0.13.0
func (c *Client) OnPresence(h PresenceHandler)
OnPresence allows setting PresenceHandler. PresenceHandler called when Presence request from client received. At this moment you can only return a custom error or disconnect client.
func (*Client) OnPresenceStats ¶ added in v0.13.0
func (c *Client) OnPresenceStats(h PresenceStatsHandler)
OnPresenceStats allows settings PresenceStatsHandler. PresenceStatsHandler called when Presence Stats request from client received. At this moment you can only return a custom error or disconnect client.
func (*Client) OnPublish ¶ added in v0.13.0
func (c *Client) OnPublish(h PublishHandler)
OnPublish allows setting PublishHandler. PublishHandler called when client publishes message into channel.
func (*Client) OnRPC ¶ added in v0.13.0
func (c *Client) OnRPC(h RPCHandler)
OnRPC allows setting RPCHandler. RPCHandler will be executed on every incoming RPC call.
func (*Client) OnRefresh ¶ added in v0.13.0
func (c *Client) OnRefresh(h RefreshHandler)
OnRefresh allows setting RefreshHandler. RefreshHandler called when it's time to refresh expiring client connection.
func (*Client) OnStateSnapshot ¶ added in v0.23.0
func (c *Client) OnStateSnapshot(h StateSnapshotHandler)
OnStateSnapshot allows settings StateSnapshotHandler. This API is EXPERIMENTAL and may be changed/removed.
func (*Client) OnSubRefresh ¶ added in v0.13.0
func (c *Client) OnSubRefresh(h SubRefreshHandler)
OnSubRefresh allows setting SubRefreshHandler. SubRefreshHandler called when it's time to refresh client subscription.
func (*Client) OnSubscribe ¶ added in v0.13.0
func (c *Client) OnSubscribe(h SubscribeHandler)
OnSubscribe allows setting SubscribeHandler. SubscribeHandler called when client subscribes on a channel.
func (*Client) OnUnsubscribe ¶ added in v0.13.0
func (c *Client) OnUnsubscribe(h UnsubscribeHandler)
OnUnsubscribe allows setting UnsubscribeHandler. UnsubscribeHandler called when client unsubscribes from channel.
func (*Client) Refresh ¶ added in v0.18.0
func (c *Client) Refresh(opts ...RefreshOption) error
func (*Client) Send ¶
Send data to client. This sends an asynchronous message – data will be just written to connection. on client side this message can be handled with Message handler.
func (*Client) StateSnapshot ¶ added in v0.23.0
StateSnapshot allows collecting current state copy. Mostly useful for connection introspection from the outside. This API is EXPERIMENTAL and may be changed/removed.
func (*Client) Subscribe ¶ added in v0.5.0
func (c *Client) Subscribe(channel string, opts ...SubscribeOption) error
Subscribe client to a channel.
func (*Client) Transport ¶
func (c *Client) Transport() TransportInfo
Transport returns client connection transport information.
func (*Client) Unsubscribe ¶
func (c *Client) Unsubscribe(ch string, unsubscribe ...Unsubscribe)
Unsubscribe allows unsubscribing client from channel.
func (*Client) WritePublication ¶ added in v0.30.3
func (c *Client) WritePublication(channel string, publication *Publication, sp StreamPosition) error
WritePublication allows sending publications to Client subscription directly without HUB and Broker semantics. The possible use case is to turn subscription to a channel into an individual data stream. This API is EXPERIMENTAL and may be changed/removed.
type ClientCloseFunc ¶ added in v0.10.0
type ClientCloseFunc func() error
ClientCloseFunc must be called on Transport handler close to clean up Client.
type ClientInfo ¶
type ClientInfo struct { // ClientID is a client unique id. ClientID string // UserID is an ID of authenticated user. Zero value means anonymous user. UserID string // ConnInfo is additional information about connection. ConnInfo []byte // ChanInfo is additional information about connection in context of // channel subscription. ChanInfo []byte }
ClientInfo contains information about client connection.
type Closer ¶
type Closer interface { // Close when called should clean up used resources. Close(ctx context.Context) error }
Closer is an interface that Broker and PresenceManager can optionally implement if they need to close any resources on Centrifuge Node graceful shutdown.
type CommandProcessedEvent ¶ added in v0.28.0
type CommandProcessedEvent struct { // Command which was processed. May be pooled - see comment of CommandProcessedEvent. Command *protocol.Command // Error may be set to non-nil if Command processing resulted into error. Error error // Reply to the command. Reply may be pooled - see comment of CommandProcessedEvent. // This Reply may be nil, for example in the following cases: // 1. For Send command since send commands do not have replies // 2. When command processing resulted into disconnection of the client without sending a reply. // 3. When unidirectional transport connects (Centrifuge creates Connect Command artificially // with id: 1 and never sends replies to the unidirectional transport, only pushes). Reply *protocol.Reply // Started is a time command was passed to Client for processing. Started time.Time }
CommandProcessedEvent contains protocol.Command processed by Client. Command and Reply types and their fields in the event MAY BE POOLED by Centrifuge, so code which wants to use them AFTER CommandProcessedHandler handler returns MUST MAKE A COPY.
type CommandProcessedHandler ¶ added in v0.28.0
type CommandProcessedHandler func(*Client, CommandProcessedEvent)
CommandProcessedHandler allows setting a callback which will be called after Client processed a protocol.Command. This exists mostly for real-time connection tracing purposes. CommandProcessedHandler may be called after the corresponding Reply written to the connection and TransportWriteHandler called. But for tracing purposes this seems tolerable as commands and replies may be matched by id. Also, carefully read docs for CommandProcessedEvent to avoid possible bugs.
type CommandReadEvent ¶ added in v0.27.0
type CommandReadEvent struct { // Command which was read from the connection. May be pooled - see comment of CommandReadEvent. Command *protocol.Command // CommandSize is a size of command in bytes in its protocol representation. CommandSize int }
CommandReadEvent contains protocol.Command processed by Client. Command type and its fields in the event MAY BE POOLED by Centrifuge, so code which wants to use Command AFTER CommandReadHandler handler returns MUST MAKE A COPY.
type CommandReadHandler ¶ added in v0.27.0
type CommandReadHandler func(*Client, CommandReadEvent) error
CommandReadHandler allows setting a callback which will be called before Client processed a protocol.Command read from the connection. Return an error if you want to prevent command execution. Also, carefully read docs for CommandReadEvent to avoid possible bugs.
type Config ¶
type Config struct { // Version of server – if set will be sent to a client on connection // establishment phase in reply to connect command from a client. Version string // Name is a unique name of the current server Node. Name used as human-readable // and meaningful node identifier. If not set then os.Hostname will be used. Name string // LogLevel is a log level. By default, nothing will be logged by Centrifuge. LogLevel LogLevel // LogHandler is a handler function Node will send logs to. LogHandler LogHandler // NodeInfoMetricsAggregateInterval sets interval for automatic metrics // aggregation. It's not reasonable to have it less than one second. // Zero value means 60 * time.Second. NodeInfoMetricsAggregateInterval time.Duration // ClientConnectIncludeServerTime tells Centrifuge to append `time` field to Connect result of client protocol. // This field contains Unix timestamp in milliseconds and represents current server time. By default, server time // is not included. ClientConnectIncludeServerTime bool // ClientPresenceUpdateInterval sets an interval how often connected // clients update presence information. // Zero value means 25 * time.Second. ClientPresenceUpdateInterval time.Duration // ClientExpiredCloseDelay is an extra time given to client to refresh // its connection in the end of connection TTL. At moment only used for // a client-side refresh workflow. // Zero value means 25 * time.Second. ClientExpiredCloseDelay time.Duration // ClientExpiredSubCloseDelay is an extra time given to client to // refresh its expiring subscription in the end of subscription TTL. // At the moment only used for a client-side subscription refresh workflow. // Zero value means 25 * time.Second. ClientExpiredSubCloseDelay time.Duration // ClientStaleCloseDelay is a timeout after which connection will be // closed if still not authenticated (i.e. no valid connect command // received yet). // Zero value means 15 * time.Second. ClientStaleCloseDelay time.Duration // ClientChannelPositionCheckDelay defines minimal time from previous // client position check in channel. If client does not pass check it // will be disconnected with DisconnectInsufficientState. // Zero value means 40 * time.Second. ClientChannelPositionCheckDelay time.Duration // Maximum allowed time lag for publications for subscribers with positioning on. // When exceeded we mark connection with insufficient state. By default, not used - i.e. // Centrifuge does not take lag into account for positioning. // See also pub_sub_time_lag_seconds as a helpful metric. ClientChannelPositionMaxTimeLag time.Duration // ClientQueueMaxSize is a maximum size of client's message queue in // bytes. After this queue size exceeded Centrifuge closes client's connection. // Zero value means 1048576 bytes (1MB). ClientQueueMaxSize int // ClientChannelLimit sets upper limit of client-side channels each client // can subscribe to. Client-side subscriptions attempts will get an ErrorLimitExceeded // in subscribe reply. Server-side subscriptions above limit will result into // DisconnectChannelLimit. // Zero value means 128. ClientChannelLimit int // UserConnectionLimit limits number of client connections to single Node // from user with the same ID. Zero value means unlimited. Anonymous users // can't be tracked. UserConnectionLimit int // ChannelMaxLength is the maximum length of a channel name. This is only checked // for client-side subscription requests. // Zero value means 255. ChannelMaxLength int // HistoryMaxPublicationLimit allows limiting the maximum number of publications to be // asked over client API history call. This is useful when you have large streams and // want to prevent a massive number of missed messages to be sent to a client when // calling history without any limit explicitly set. By default, no limit used. // This option does not affect Node.History method. See also RecoveryMaxPublicationLimit. HistoryMaxPublicationLimit int // RecoveryMaxPublicationLimit allows limiting the number of Publications that could be // restored during the automatic recovery process. See also HistoryMaxPublicationLimit. // By default, no limit used. RecoveryMaxPublicationLimit int // UseSingleFlight allows turning on mode where singleflight will be automatically used // for Node.History (including recovery) and Node.Presence/Node.PresenceStats calls. UseSingleFlight bool // HistoryMetaTTL sets a time of stream meta key expiration in Redis. Stream // meta key is a Redis HASH that contains top offset in channel and epoch value. // In some cases – when channels created for а short time and then // not used anymore – created stream meta keys can stay in memory while // not actually useful. For example, you can have a personal user channel but // after using your app for a while user left it forever. In long-term // perspective this can be an unwanted memory leak. Setting a reasonable // value to this option (usually much bigger than history retention period) // can help. In this case unused channel stream metadata will eventually expire. // // Keep this value much larger than history stream TTL used when publishing. // When zero Centrifuge uses default 30 days which we believe is more than enough // for most use cases. HistoryMetaTTL time.Duration // Metrics is MetricsConfig to configure Prometheus metrics provided by Centrifuge. Metrics MetricsConfig // GetChannelMediumOptions is a way to provide ChannelMediumOptions for specific channel. // This function is called each time new channel appears on the Node. // See the doc comment for ChannelMediumOptions for more details about channel medium concept. GetChannelMediumOptions func(channel string) ChannelMediumOptions // GetBroker when set allows returning a custom Broker to use for a specific channel. If not set // then the default Node's Broker is always used for all channels. Also, Node's default Broker is // always used for control channels. It's the responsibility of an application to call Broker.Run // method of all brokers except the default one (called automatically inside Node.Run). Also, a // proper Broker shutdown is the responsibility of application because Node does not know about // custom Broker instances. When GetBroker returns false as the second argument then Node will // use the default Broker for the channel. GetBroker func(channel string) (Broker, bool) // GetPresenceManager when set allows returning a custom PresenceManager to use for a specific // channel. If not set then the default Node's PresenceManager is always used for all channels. // A proper PresenceManager shutdown is the responsibility of application because Node does not // know about custom PresenceManager instances. When GetPresenceManager returns false as the second // argument then Node will use the default PresenceManager for the channel. GetPresenceManager func(channel string) (PresenceManager, bool) // Tell Centrifuge how to transform connect error codes to disconnect objects for unidirectional // transports. If not set or code not found in the mapping then Centrifuge falls back to the default // mapping defined internally. UnidirectionalCodeToDisconnect map[uint32]Disconnect }
Config contains Node configuration options.
type ConnectEvent ¶
type ConnectEvent struct { // ClientID that was generated by library for client connection. ClientID string // Token received from client as part of Connect Command. Token string // Data received from client as part of Connect Command. Data []byte // Name can contain client name if provided on connect. Name string // Version can contain client version if provided on connect. Version string // Transport contains information about transport used by client. Transport TransportInfo // Channels is a list of channels a client wants to subscribe to // (server-side). It's just a way for a client to provide this list. // Server should use ConnectReply.Subscriptions to tell Centrifuge // the final list of server-side subscriptions for a connection which // can differ from the Channels list. Channels []string // Headers can be set by SDK to pass custom key-value headers from // client to server. It's up to the application to decide what to do // with these headers. Headers map[string]string }
ConnectEvent contains fields related to connecting event (when a server received Connect protocol command from client).
type ConnectHandler ¶
type ConnectHandler func(*Client)
ConnectHandler called when client connected to server and ready to communicate.
type ConnectReply ¶
type ConnectReply struct { // Context allows returning a modified context. Context context.Context // Credentials should be set if app wants to authenticate connection. // This field is optional since auth Credentials could be set through // HTTP middleware. Credentials *Credentials // Data allows setting custom data in connect reply. Data []byte // Subscriptions map contains channels to subscribe connection to on server-side. Subscriptions map[string]SubscribeOptions // ClientSideRefresh tells library to use client-side refresh logic: // i.e. send refresh commands with new connection token. If not set // then server-side refresh mechanism will be used. ClientSideRefresh bool // Storage can be used to fill initial connection storage during connecting. // This data may be then accessed/modified/replaced later during Client's lifetime // over Client.AcquireStorage() call. This API is EXPERIMENTAL. Storage map[string]any // MaxMessagesInFrame is the maximum number of messages (replies and pushes) which // Centrifuge Client message writer will collect from the client's queue before sending // to the connection. By default, it's 16. Use -1 to disable the limit. MaxMessagesInFrame int // WriteDelay is a time Centrifuge will try to collect messages inside message writer loop // before sending them towards this connection. Enabling WriteDelay may reduce CPU usage of // both server and client in case of high message rate inside individual connections. The // reduction happens due to the lesser number of system calls to execute. Enabling WriteDelay // limits the maximum throughput of messages towards the connection which may be achieved. // For example, if WriteDelay is 100ms then the max throughput per second will be // (1000 / 100) * MaxMessagesInFrame (16 by default), i.e. 160 messages per second. This // should be more than enough for target Centrifuge use cases (frontend apps) though. WriteDelay time.Duration // ReplyWithoutQueue when enabled will force Centrifuge to avoid using Client write // queue for sending replies to commands for this connection. Replies sent directly to // the Client's transport thus avoiding possible delays caused by writer loop, but replies // lose a chance to be batched. ReplyWithoutQueue bool // QueueInitialCap set an initial capacity for client's message queue, the size of queue // can grow further, but won't be reduced below QueueInitialCap. By default, it's 2. QueueInitialCap int // PingPongConfig if set, will override Transport's PingPongConfig to enable setting ping/pong interval // for individual client. PingPongConfig *PingPongConfig }
ConnectReply contains reaction to ConnectEvent.
type ConnectRequest ¶ added in v0.16.0
type ConnectRequest struct { // Token is an optional token from a client. Token string // Data is an optional custom data from a client. Data []byte // Name of a client. Name string // Version of a client. Version string // Subs is a map with channel subscription state (for recovery on connect). Subs map[string]SubscribeRequest }
ConnectRequest can be used in a unidirectional connection case to pass initial connection information from a client-side.
type ConnectingHandler ¶ added in v0.0.2
type ConnectingHandler func(context.Context, ConnectEvent) (ConnectReply, error)
ConnectingHandler called when new client authenticates on server.
type Credentials ¶
type Credentials struct { // UserID tells library an ID of current user. Leave this empty string // if you need access from anonymous user. UserID string // ExpireAt allows setting time in future when connection must be validated. // In this case Client.OnRefresh callback must be set by application. Zero // value means no expiration. ExpireAt int64 // Info contains additional information about connection. This data will be // included untouched into Join/Leave messages, into Presence information, // also info can become a part of published message as part of ClientInfo. // In some cases having additional info can be an undesired overhead – but // you are simply free to not use this field at all. Info []byte }
Credentials allow authenticating connection when set into context.
func GetCredentials ¶ added in v0.5.0
func GetCredentials(ctx context.Context) (*Credentials, bool)
GetCredentials allows extracting Credentials from Context (if set previously).
type DeltaType ¶ added in v0.33.0
type DeltaType string
const ( // DeltaTypeFossil is Fossil delta encoding. See https://fossil-scm.org/home/doc/tip/www/delta_encoder_algorithm.wiki. DeltaTypeFossil DeltaType = "fossil" )
type Disconnect ¶
type Disconnect struct { // Code is a disconnect code. Code uint32 `json:"code,omitempty"` // Reason is a short description of disconnect code for humans. Reason string `json:"reason"` }
Disconnect allows configuring how client will be disconnected from a server. A server can provide a Disconnect.Code and Disconnect.Reason to a client. Clients can execute some custom logic based on a certain Disconnect.Code. Code is also used for metric collection. Disconnect.Reason is optional and exists mostly for human-readable description of returned code – i.e. for logging, debugging etc.
The important note is that Disconnect.Reason must be less than 127 bytes due to WebSocket protocol limitations.
Codes have some rules which should be followed by a client connector implementation. These rules described below.
Codes in range 0-2999 should not be used by a Centrifuge library user. Those are reserved for the client-side and transport specific needs. Codes in range >=5000 should not be used also. Those are reserved by Centrifuge.
Client should reconnect upon receiving code in range 3000-3499, 4000-4499, >=5000. For codes <3000 reconnect behavior can be adjusted for specific transport.
Codes in range 3500-3999 and 4500-4999 are application terminal codes, no automatic reconnect should be made by a client implementation.
Library users supposed to use codes in range 4000-4999 for creating custom disconnects.
func (Disconnect) Error ¶ added in v0.10.0
func (d Disconnect) Error() string
Error to use Disconnect as a callback handler error to signal Centrifuge that client must be disconnected with corresponding Code and Reason.
func (Disconnect) String ¶ added in v0.8.2
func (d Disconnect) String() string
String representation.
type DisconnectEvent ¶
type DisconnectEvent struct { // Disconnect contains a Disconnect object which identifies the code and reason // of disconnect process. When disconnect was not initiated by a server this // is always DisconnectConnectionClosed. Disconnect }
DisconnectEvent contains fields related to disconnect event.
type DisconnectHandler ¶
type DisconnectHandler func(DisconnectEvent)
DisconnectHandler called when client disconnects from server. The important thing to remember is that you should not rely entirely on this handler to clean up non-expiring resources (in your database for example). Why? Because in case of any non-graceful node shutdown (kill -9, process crash, machine lost) disconnect handler will never be called (obviously) so you can have stale data.
type DisconnectOption ¶ added in v0.8.0
type DisconnectOption func(options *DisconnectOptions)
DisconnectOption is a type to represent various Disconnect options.
func WithCustomDisconnect ¶ added in v0.23.0
func WithCustomDisconnect(disconnect Disconnect) DisconnectOption
WithCustomDisconnect allows setting custom Disconnect.
func WithDisconnectClient ¶ added in v0.18.0
func WithDisconnectClient(clientID string) DisconnectOption
WithDisconnectClient allows setting Client.
func WithDisconnectClientWhitelist ¶ added in v0.18.0
func WithDisconnectClientWhitelist(whitelist []string) DisconnectOption
WithDisconnectClientWhitelist allows setting ClientWhitelist.
func WithDisconnectSession ¶ added in v0.21.1
func WithDisconnectSession(sessionID string) DisconnectOption
WithDisconnectSession allows setting session ID to disconnect.
type DisconnectOptions ¶ added in v0.8.0
type DisconnectOptions struct { // Disconnect represents custom disconnect to use. // By default, DisconnectForceNoReconnect will be used. Disconnect *Disconnect // ClientWhitelist contains client IDs to keep. ClientWhitelist []string // contains filtered or unexported fields }
DisconnectOptions define some fields to alter behaviour of Disconnect operation.
type EmulationConfig ¶ added in v0.23.0
type EmulationConfig struct { // MaxRequestBodySize limits request body size (in bytes). By default we accept 64kb max. MaxRequestBodySize int }
EmulationConfig is a config for EmulationHandler.
type EmulationHandler ¶ added in v0.23.0
type EmulationHandler struct {
// contains filtered or unexported fields
}
EmulationHandler allows receiving client protocol commands from client and proxy them to the right node (where client session lives). This makes it possible to use unidirectional transports for server-to-clients data flow but still emulate bidirectional connection - thanks to this handler. Redirection to the correct node works over Survey.
func NewEmulationHandler ¶ added in v0.23.0
func NewEmulationHandler(node *Node, config EmulationConfig) *EmulationHandler
NewEmulationHandler creates new EmulationHandler.
func (*EmulationHandler) ServeHTTP ¶ added in v0.23.0
func (s *EmulationHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)
type Error ¶
Error represents client reply error. Library user can define own application specific errors. When defining new custom errors use error codes in range [400, 1999] assuming that codes in interval 0-399 are reserved by Centrifuge.
type HTTPStreamConfig ¶ added in v0.23.0
type HTTPStreamConfig struct { PingPongConfig // MaxRequestBodySize limits request body size. MaxRequestBodySize int }
HTTPStreamConfig represents config for HTTPStreamHandler.
type HTTPStreamHandler ¶ added in v0.23.0
type HTTPStreamHandler struct {
// contains filtered or unexported fields
}
HTTPStreamHandler handles WebSocket client connections. WebSocket protocol is a bidirectional connection between a client and a server for low-latency communication.
func NewHTTPStreamHandler ¶ added in v0.23.0
func NewHTTPStreamHandler(node *Node, config HTTPStreamConfig) *HTTPStreamHandler
NewHTTPStreamHandler creates new HTTPStreamHandler.
func (*HTTPStreamHandler) ServeHTTP ¶ added in v0.23.0
func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
type HistoryCallback ¶ added in v0.13.0
type HistoryCallback func(HistoryReply, error)
HistoryCallback should be called with HistoryReply or error.
type HistoryEvent ¶ added in v0.10.0
type HistoryEvent struct { Channel string Filter HistoryFilter }
HistoryEvent has channel operation called for.
type HistoryFilter ¶
type HistoryFilter struct { // Since used to extract publications from stream since provided StreamPosition. Since *StreamPosition // Limit number of publications to return. // -1 means no limit - i.e. return all publications currently in stream. // 0 means that caller only interested in current stream top position so // Broker should not return any publications. Limit int // Reverse direction. Reverse bool }
HistoryFilter allows filtering history according to fields set.
type HistoryHandler ¶ added in v0.10.0
type HistoryHandler func(HistoryEvent, HistoryCallback)
HistoryHandler must handle incoming command from client.
type HistoryOption ¶ added in v0.8.0
type HistoryOption func(options *HistoryOptions)
HistoryOption is a type to represent various History options.
func WithHistoryFilter ¶ added in v0.29.0
func WithHistoryFilter(filter HistoryFilter) HistoryOption
func WithHistoryMetaTTL ¶ added in v0.29.0
func WithHistoryMetaTTL(metaTTL time.Duration) HistoryOption
func WithLimit ¶ added in v0.8.0
func WithLimit(limit int) HistoryOption
WithLimit allows setting HistoryOptions.Limit.
func WithReverse ¶ added in v0.18.0
func WithReverse(reverse bool) HistoryOption
WithReverse allows setting HistoryOptions.Reverse option.
func WithSince ¶ added in v0.17.0
func WithSince(sp *StreamPosition) HistoryOption
WithSince allows setting HistoryOptions.Since option.
type HistoryOptions ¶ added in v0.8.0
type HistoryOptions struct { // Filter for history publications. Filter HistoryFilter // MetaTTL allows overriding default (set in Config.HistoryMetaTTL) history // meta information expiration time. MetaTTL time.Duration }
HistoryOptions define some fields to alter History method behaviour.
type HistoryReply ¶ added in v0.10.0
type HistoryReply struct {
Result *HistoryResult
}
HistoryReply contains fields determining the reaction on history request.
type HistoryResult ¶ added in v0.8.0
type HistoryResult struct { // StreamPosition embedded here describes current stream top offset and epoch. StreamPosition // Publications extracted from history storage according to HistoryFilter. Publications []*Publication }
HistoryResult contains Publications and current stream top StreamPosition.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub tracks Client connections on the current Node.
func (*Hub) BroadcastPublication ¶ added in v0.17.1
func (h *Hub) BroadcastPublication(ch string, pub *Publication, sp StreamPosition) error
BroadcastPublication sends message to all clients subscribed on a channel on the current Node. Usually this is NOT what you need since in most cases you should use Node.Publish method which uses a Broker to deliver publications to all Nodes in a cluster and maintains publication history in a channel with incremental offset. By calling BroadcastPublication messages will only be sent to the current node subscribers without any defined offset semantics, without delta support.
func (*Hub) Connections ¶ added in v0.23.0
Connections returns all user connections to the current Node.
func (*Hub) NumChannels ¶
NumChannels returns a total number of different channels.
func (*Hub) NumClients ¶
NumClients returns total number of client connections.
func (*Hub) NumSubscribers ¶
NumSubscribers returns number of current subscribers for a given channel.
func (*Hub) NumSubscriptions ¶ added in v0.18.0
NumSubscriptions returns a total number of subscriptions.
type Info ¶
type Info struct {
Nodes []NodeInfo
}
Info contains information about all known server nodes.
type LogHandler ¶
type LogHandler func(LogEntry)
LogHandler handles log entries - i.e. writes into correct destination if necessary.
type LogLevel ¶
type LogLevel int
LogLevel describes the chosen log level.
const ( // LogLevelNone means no logging. LogLevelNone LogLevel = iota // LogLevelTrace turns on trace logs - should only be used during development. This // log level shows all client-server communication. LogLevelTrace // LogLevelDebug turns on debug logs - it's generally too much for production in normal // conditions but can help when developing and investigating problems in production. LogLevelDebug // LogLevelInfo logs useful server information. This includes various information // about problems with client connections which is not Centrifuge errors but // in most situations malformed client behaviour. LogLevelInfo // LogLevelWarn logs server warnings. This may contain tips for a developer about things // which should be addressed but usually not immediately. LogLevelWarn // LogLevelError level logs only server errors. This is logging that means non-working // Centrifuge and may require effort from developers/administrators to make things // work again. LogLevelError )
type MemoryBroker ¶ added in v0.16.0
type MemoryBroker struct {
// contains filtered or unexported fields
}
MemoryBroker is builtin default Broker which allows running Centrifuge-based server without any external broker. All data managed inside process memory.
With this Broker you can only run single Centrifuge node. If you need to scale you should consider using another Broker implementation instead – for example RedisBroker.
Running single node can be sufficient for many use cases especially when you need maximum performance and not too many online clients. Consider configuring your load balancer to have one backup Centrifuge node for HA in this case.
func NewMemoryBroker ¶ added in v0.16.0
func NewMemoryBroker(n *Node, _ MemoryBrokerConfig) (*MemoryBroker, error)
NewMemoryBroker initializes MemoryBroker.
func (*MemoryBroker) Close ¶ added in v0.17.1
func (b *MemoryBroker) Close(_ context.Context) error
Close is noop for now.
func (*MemoryBroker) History ¶ added in v0.16.0
func (b *MemoryBroker) History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error)
History - see Broker interface description.
func (*MemoryBroker) Publish ¶ added in v0.16.0
func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error)
Publish adds message into history hub and calls node method to handle message. We don't have any PUB/SUB here as Memory Engine is single node only.
func (*MemoryBroker) PublishControl ¶ added in v0.16.0
func (b *MemoryBroker) PublishControl(data []byte, _, _ string) error
PublishControl - see Broker interface description.
func (*MemoryBroker) PublishJoin ¶ added in v0.16.0
func (b *MemoryBroker) PublishJoin(ch string, info *ClientInfo) error
PublishJoin - see Broker interface description.
func (*MemoryBroker) PublishLeave ¶ added in v0.16.0
func (b *MemoryBroker) PublishLeave(ch string, info *ClientInfo) error
PublishLeave - see Broker interface description.
func (*MemoryBroker) RemoveHistory ¶ added in v0.16.0
func (b *MemoryBroker) RemoveHistory(ch string) error
RemoveHistory - see Broker interface description.
func (*MemoryBroker) Run ¶ added in v0.16.0
func (b *MemoryBroker) Run(h BrokerEventHandler) error
Run runs memory broker.
func (*MemoryBroker) Subscribe ¶ added in v0.16.0
func (b *MemoryBroker) Subscribe(_ string) error
Subscribe is noop here.
func (*MemoryBroker) Unsubscribe ¶ added in v0.16.0
func (b *MemoryBroker) Unsubscribe(_ string) error
Unsubscribe node from channel. Noop here.
type MemoryBrokerConfig ¶ added in v0.16.0
type MemoryBrokerConfig struct{}
MemoryBrokerConfig is a memory broker config.
type MemoryPresenceManager ¶ added in v0.16.0
type MemoryPresenceManager struct {
// contains filtered or unexported fields
}
MemoryPresenceManager is builtin default PresenceManager which allows running Centrifuge-based server without any external storage. All data managed inside process memory.
With this PresenceManager you can only run single Centrifuge node. If you need to scale you should consider using another PresenceManager implementation instead – for example RedisPresenceManager.
Running single node can be sufficient for many use cases especially when you need maximum performance and not too many online clients. Consider configuring your load balancer to have one backup Centrifuge node for HA in this case.
func NewMemoryPresenceManager ¶ added in v0.16.0
func NewMemoryPresenceManager(n *Node, c MemoryPresenceManagerConfig) (*MemoryPresenceManager, error)
NewMemoryPresenceManager initializes MemoryPresenceManager.
func (*MemoryPresenceManager) AddPresence ¶ added in v0.16.0
func (m *MemoryPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error
AddPresence - see PresenceManager interface description.
func (*MemoryPresenceManager) Close ¶ added in v0.17.1
func (m *MemoryPresenceManager) Close(_ context.Context) error
Close is noop for now.
func (*MemoryPresenceManager) Presence ¶ added in v0.16.0
func (m *MemoryPresenceManager) Presence(ch string) (map[string]*ClientInfo, error)
Presence - see PresenceManager interface description.
func (*MemoryPresenceManager) PresenceStats ¶ added in v0.16.0
func (m *MemoryPresenceManager) PresenceStats(ch string) (PresenceStats, error)
PresenceStats - see PresenceManager interface description.
func (*MemoryPresenceManager) RemovePresence ¶ added in v0.16.0
func (m *MemoryPresenceManager) RemovePresence(ch string, clientID string, _ string) error
RemovePresence - see PresenceManager interface description.
type MemoryPresenceManagerConfig ¶ added in v0.16.0
type MemoryPresenceManagerConfig struct{}
MemoryPresenceManagerConfig is a MemoryPresenceManager config.
type MessageEvent ¶
type MessageEvent struct { // Data contains message untouched payload. Data []byte }
MessageEvent contains fields related to message request.
type MessageHandler ¶
type MessageHandler func(MessageEvent)
MessageHandler must handle incoming async message from client. So Centrifuge feels similar to pure WebSocket API. Though in general, we recommend using RPC where possible to send data to a server from a client as it provides a better flow control.
type MetricsConfig ¶ added in v0.34.0
type MetricsConfig struct { // MetricsNamespace is a Prometheus metrics namespace to use for Centrifuge metrics. // If not set then the default metrics namespace name "centrifuge" will be used. MetricsNamespace string // RegistererGatherer is a Prometheus registerer and gatherer. If not set then a // prometheus.DefaultRegisterer and prometheus.DefaultGatherer will be used. RegistererGatherer RegistererGatherer // GetChannelNamespaceLabel if set will be used by Centrifuge to extract channel_namespace // label for channel related metrics. Make sure to maintain low cardinality of returned values // to avoid issues with Prometheus performance. This function may introduce sufficient overhead // since it's called in hot paths - so it should be fast. By default, Centrifuge uses cache // of resolved channel namespace labels to avoid calling this function too often. See below // ChannelNamespaceCacheSize and ChannelNamespaceCacheTTL options to tweak the cache behavior. GetChannelNamespaceLabel func(channel string) string // ChannelNamespaceCacheSize sets the size of the cache for channel namespace label resolution. // Zero value will use cache size equal to 4096. Set -1 to disable cache (in that case make sure // your GetChannelNamespaceLabel is fast and ideally does not allocate because it's called in hot // paths). ChannelNamespaceCacheSize int // ChannelNamespaceCacheTTL sets the time after which resolved channel namespace for a channel // will expire in the cache. If zero – default TTL 10 seconds is used. ChannelNamespaceCacheTTL time.Duration // RegisteredClientNames is an optional list of known client names which will be allowed to be // attached as labels to metrics. If client passed a name which is not in the list – then Centrifuge // will use string "unregistered" as a client_name label. We need to be strict here to avoid // Prometheus cardinality issues. RegisteredClientNames []string // CheckRegisteredClientVersion is a function to check whether the version passed by a client with a // particular name is valid and can be used in metric values. When function is not set or returns // false Centrifuge will use "unregistered" value for a client version. Note, the name argument here // is an original name of client passed to Centrifuge. CheckRegisteredClientVersion func(clientName string, clientVersion string) bool }
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is a heart of Centrifuge library – it keeps and manages client connections, maintains information about other Centrifuge nodes in cluster, keeps references to common things (like Broker and PresenceManager, Hub) etc. By default, Node uses in-memory implementations of Broker and PresenceManager - MemoryBroker and MemoryPresenceManager which allow running a single Node only. To scale use other implementations of Broker and PresenceManager like builtin RedisBroker and RedisPresenceManager.
func (*Node) Disconnect ¶
func (n *Node) Disconnect(userID string, opts ...DisconnectOption) error
Disconnect allows closing all user connections on all nodes.
func (*Node) HandleControl ¶ added in v0.33.0
HandleControl coming from Broker.
func (*Node) HandleJoin ¶ added in v0.33.0
func (n *Node) HandleJoin(ch string, info *ClientInfo) error
HandleJoin coming from Broker.
func (*Node) HandleLeave ¶ added in v0.33.0
func (n *Node) HandleLeave(ch string, info *ClientInfo) error
HandleLeave coming from Broker.
func (*Node) HandlePublication ¶ added in v0.33.0
func (n *Node) HandlePublication(ch string, pub *Publication, sp StreamPosition, delta bool, prevPub *Publication) error
HandlePublication coming from Broker.
func (*Node) History ¶
func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error)
History allows extracting Publications in channel. The channel must belong to namespace where history is on.
func (*Node) Notify ¶ added in v0.17.0
Notify allows sending an asynchronous notification to all other nodes (or to a single specific node). Unlike Survey, it does not wait for any response. If toNodeID is not an empty string then a notification will be sent to a concrete node in cluster, otherwise a notification sent to all running nodes. See a corresponding Node.OnNotification method to handle received notifications.
func (*Node) NotifyShutdown ¶
func (n *Node) NotifyShutdown() chan struct{}
NotifyShutdown returns a channel which will be closed on node shutdown.
func (*Node) OnCacheEmpty ¶ added in v0.33.0
func (n *Node) OnCacheEmpty(h CacheEmptyHandler)
OnCacheEmpty allows setting CacheEmptyHandler. CacheEmptyHandler called when client subscribes on a channel with RecoveryModeCache but there is no cached value in channel. In response to this handler it's possible to tell Centrifuge what to do with subscribe request – keep it, or return error.
func (*Node) OnCommandProcessed ¶ added in v0.28.0
func (n *Node) OnCommandProcessed(handler CommandProcessedHandler)
OnCommandProcessed allows setting CommandProcessedHandler. This should be done before Node.Run called.
func (*Node) OnCommandRead ¶ added in v0.27.0
func (n *Node) OnCommandRead(handler CommandReadHandler)
OnCommandRead allows setting CommandReadHandler. This should be done before Node.Run called.
func (*Node) OnConnect ¶ added in v0.10.0
func (n *Node) OnConnect(handler ConnectHandler)
OnConnect allows setting ConnectHandler. ConnectHandler called after client connection successfully established, authenticated and Connect Reply already sent to client. This is a place where application can start communicating with client.
func (*Node) OnConnecting ¶ added in v0.10.0
func (n *Node) OnConnecting(handler ConnectingHandler)
OnConnecting allows setting ConnectingHandler. ConnectingHandler will be called when client sends Connect command to server. In this handler server can reject connection or provide Credentials for it.
func (*Node) OnNodeInfoSend ¶ added in v0.18.0
func (n *Node) OnNodeInfoSend(handler NodeInfoSendHandler)
OnNodeInfoSend allows setting NodeInfoSendHandler. This should be done before Node.Run called.
func (*Node) OnNotification ¶ added in v0.17.0
func (n *Node) OnNotification(handler NotificationHandler)
OnNotification allows setting NotificationHandler. This should be done before Node.Run called.
func (*Node) OnSurvey ¶ added in v0.15.0
func (n *Node) OnSurvey(handler SurveyHandler)
OnSurvey allows setting SurveyHandler. This should be done before Node.Run called.
func (*Node) OnTransportWrite ¶ added in v0.18.0
func (n *Node) OnTransportWrite(handler TransportWriteHandler)
OnTransportWrite allows setting TransportWriteHandler. This should be done before Node.Run called.
func (*Node) Presence ¶
func (n *Node) Presence(ch string) (PresenceResult, error)
Presence returns a map with information about active clients in channel.
func (*Node) PresenceStats ¶
func (n *Node) PresenceStats(ch string) (PresenceStatsResult, error)
PresenceStats returns presence stats from PresenceManager.
func (*Node) Publish ¶
func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (PublishResult, error)
Publish sends data to all clients subscribed on channel at this moment. All running nodes will receive Publication and send it to all local channel subscribers.
Data expected to be valid marshaled JSON or any binary payload. Connections that work over JSON protocol can not handle binary payloads. Connections that work over Protobuf protocol can work both with JSON and binary payloads.
So the rule here: if you have channel subscribers that work using JSON protocol then you can not publish binary data to these channel.
Channels in Centrifuge are ephemeral and its settings not persisted over different publish operations. So if you want to have a channel with history stream behind you need to provide WithHistory option on every publish. To simplify working with different channels you can make some type of publish wrapper in your own code.
The returned PublishResult contains embedded StreamPosition that describes position inside stream Publication was added too. For channels without history enabled (i.e. when Publications only sent to PUB/SUB system) StreamPosition will be an empty struct (i.e. PublishResult.Offset will be zero).
func (*Node) Refresh ¶ added in v0.18.0
func (n *Node) Refresh(userID string, opts ...RefreshOption) error
Refresh user connection. Without any options will make user connections non-expiring. Note, that OnRefresh event won't be called in this case since this is a server-side refresh.
func (*Node) RemoveHistory ¶
RemoveHistory removes channel history.
func (*Node) Run ¶
Run performs node startup actions. At moment must be called once on start after Broker set to Node.
func (*Node) SetPresenceManager ¶
func (n *Node) SetPresenceManager(m PresenceManager)
SetPresenceManager allows setting PresenceManager to use.
func (*Node) Shutdown ¶
Shutdown sets shutdown flag to Node so handlers could stop accepting new requests and disconnects clients with shutdown reason.
func (*Node) Subscribe ¶ added in v0.16.0
func (n *Node) Subscribe(userID string, channel string, opts ...SubscribeOption) error
Subscribe subscribes user to a channel. Note, that OnSubscribe event won't be called in this case since this is a server-side subscription. If user have been already subscribed to a channel then its subscription will be updated and subscribe notification will be sent to a client-side.
func (*Node) Survey ¶ added in v0.15.0
func (n *Node) Survey(ctx context.Context, op string, data []byte, toNodeID string) (map[string]SurveyResult, error)
Survey allows collecting data from all running Centrifuge nodes. This method publishes control messages, then waits for replies from all running nodes. The maximum time to wait can be controlled over context timeout. If provided context does not have a deadline for survey then this method uses default 10 seconds timeout. Keep in mind that Survey does not scale very well as number of Centrifuge Node grows. Though it has reasonably good performance to perform rare tasks even with relatively large number of nodes. If toNodeID is not an empty string then a survey will be sent only to the concrete node in a cluster, otherwise a survey sent to all running nodes. See a corresponding Node.OnSurvey method to handle received surveys. Survey ops starting with `centrifuge_` are reserved by Centrifuge library.
func (*Node) Unsubscribe ¶
func (n *Node) Unsubscribe(userID string, channel string, opts ...UnsubscribeOption) error
Unsubscribe unsubscribes user from a channel. If a channel is empty string then user will be unsubscribed from all channels.
type NodeInfo ¶
type NodeInfo struct { UID string Name string Version string NumClients uint32 NumUsers uint32 NumSubs uint32 NumChannels uint32 Uptime uint32 Metrics *Metrics Data []byte }
NodeInfo contains information about node.
type NodeInfoSendHandler ¶ added in v0.18.0
type NodeInfoSendHandler func() NodeInfoSendReply
NodeInfoSendHandler called every time the control node frame is published and allows modifying Node control frame sending. Currently, attaching an arbitrary data to it. See NodeInfoSendReply.
type NodeInfoSendReply ¶ added in v0.18.0
type NodeInfoSendReply struct { // Data allows setting an arbitrary data to the control node frame which is // published by each Node periodically, so it will be available in the // result of Node.Info call for the current Node description. Keep this // data reasonably small. Data []byte }
NodeInfoSendReply can modify sending Node control frame in some ways.
type NotificationEvent ¶ added in v0.17.0
NotificationEvent with Op and Data.
type NotificationHandler ¶ added in v0.17.0
type NotificationHandler func(NotificationEvent)
NotificationHandler allows handling notifications.
type PingPongConfig ¶ added in v0.26.0
type PingPongConfig struct { // PingInterval tells how often to issue server-to-client pings. // For zero value 25 secs will be used. To disable sending app-level pings use -1. PingInterval time.Duration // PongTimeout sets time for pong check after issuing a ping. // For zero value 10 seconds will be used. To disable pong checks use -1. // PongTimeout must be less than PingInterval in current implementation. PongTimeout time.Duration }
PingPongConfig allows configuring application level ping-pong behavior. Note that in current implementation PingPongConfig.PingInterval must be greater than PingPongConfig.PongTimeout.
type PresenceCallback ¶ added in v0.13.0
type PresenceCallback func(PresenceReply, error)
PresenceCallback should be called with PresenceReply or error.
type PresenceEvent ¶ added in v0.9.0
type PresenceEvent struct {
Channel string
}
PresenceEvent has channel operation called for.
type PresenceHandler ¶ added in v0.9.0
type PresenceHandler func(PresenceEvent, PresenceCallback)
PresenceHandler called when presence request received from client.
type PresenceManager ¶
type PresenceManager interface { // Presence returns actual presence information for channel. Presence(ch string) (map[string]*ClientInfo, error) // PresenceStats returns short stats of current presence data // suitable for scenarios when caller does not need full client // info returned by presence method. PresenceStats(ch string) (PresenceStats, error) // AddPresence sets or updates presence information in channel // for connection with specified identifier. PresenceManager should // have a property to expire client information that was not updated // (touched) after some configured time interval. AddPresence(ch string, clientID string, info *ClientInfo) error // RemovePresence removes presence information for connection // with specified client and user identifiers. RemovePresence(ch string, clientID string, userID string) error }
PresenceManager is responsible for channel presence management.
type PresenceReply ¶ added in v0.9.0
type PresenceReply struct {
Result *PresenceResult
}
PresenceReply contains fields determining the reaction on presence request.
type PresenceResult ¶ added in v0.10.0
type PresenceResult struct {
Presence map[string]*ClientInfo
}
PresenceResult wraps presence.
type PresenceStats ¶
type PresenceStats struct { // NumClients is a number of client connections in channel. NumClients int // NumUsers is a number of unique users in channel. NumUsers int }
PresenceStats represents a short presence information for channel.
type PresenceStatsCallback ¶ added in v0.13.0
type PresenceStatsCallback func(PresenceStatsReply, error)
PresenceStatsCallback should be called with PresenceStatsReply or error.
type PresenceStatsEvent ¶ added in v0.10.0
type PresenceStatsEvent struct {
Channel string
}
PresenceStatsEvent has channel operation called for.
type PresenceStatsHandler ¶ added in v0.10.0
type PresenceStatsHandler func(PresenceStatsEvent, PresenceStatsCallback)
PresenceStatsHandler must handle incoming command from client.
type PresenceStatsReply ¶ added in v0.10.0
type PresenceStatsReply struct {
Result *PresenceStatsResult
}
PresenceStatsReply contains fields determining the reaction on presence request.
type PresenceStatsResult ¶ added in v0.10.0
type PresenceStatsResult struct {
PresenceStats
}
PresenceStatsResult wraps presence stats.
type ProtocolType ¶ added in v0.1.0
type ProtocolType string
ProtocolType represents client connection transport encoding format.
const ( // ProtocolTypeJSON means JSON-based protocol. ProtocolTypeJSON ProtocolType = "json" // ProtocolTypeProtobuf means Protobuf protocol. ProtocolTypeProtobuf ProtocolType = "protobuf" )
type ProtocolVersion ¶ added in v0.20.0
type ProtocolVersion uint8
ProtocolVersion defines protocol behavior.
const ( // ProtocolVersion2 is the current stable client protocol. ProtocolVersion2 ProtocolVersion = 2 )
type Publication ¶
type Publication struct { // Offset is an incremental position number inside a history stream. // Zero value means that channel does not maintain Publication stream. Offset uint64 // Data published to a channel. Data []byte // Info is optional information about client connection published this data. Info *ClientInfo // Tags contains a map with custom key-values attached to a Publication. Tags map // will be delivered to a client. Tags map[string]string // Optional time of publication as Unix timestamp milliseconds. At this point // we use it for calculating PUB/SUB time lag, it's not exposed to the client // protocol. Time int64 // Channel is only set when subscription channel does not match channel in Publication. // This is a case for wildcard subscriptions. Client SDK then should use this channel // in PublicationContext. Channel string }
Publication is a data sent to a channel.
type PublishCallback ¶ added in v0.13.0
type PublishCallback func(PublishReply, error)
PublishCallback should be called with PublishReply or error.
type PublishEvent ¶
type PublishEvent struct { // Channel client wants to publish data to. Channel string // Data client wants to publish. Data []byte // ClientInfo about client connection. ClientInfo *ClientInfo }
PublishEvent contains fields related to publish event. Note that this event called before actual publish to Broker so handler has an option to reject this publication returning an error.
type PublishHandler ¶
type PublishHandler func(PublishEvent, PublishCallback)
PublishHandler called when client publishes into channel.
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption is a type to represent various Publish options.
func WithClientInfo ¶ added in v0.13.0
func WithClientInfo(info *ClientInfo) PublishOption
WithClientInfo adds ClientInfo to Publication.
func WithDelta ¶ added in v0.33.0
func WithDelta(enabled bool) PublishOption
WithDelta tells Broker to use delta streaming.
func WithHistory ¶ added in v0.12.0
WithHistory tells Broker to save message to history stream with provided size and ttl.
func WithIdempotencyKey ¶ added in v0.31.0
func WithIdempotencyKey(key string) PublishOption
WithIdempotencyKey tells Broker the idempotency key for the publication. See PublishOptions.IdempotencyKey.
func WithIdempotentResultTTL ¶ added in v0.31.0
func WithIdempotentResultTTL(ttl time.Duration) PublishOption
WithIdempotentResultTTL sets the time of expiration for results of idempotent publications. See PublishOptions.IdempotentResultTTL for more description and defaults.
func WithTags ¶ added in v0.20.0
func WithTags(meta map[string]string) PublishOption
WithTags allows setting Publication.Tags.
type PublishOptions ¶
type PublishOptions struct { // HistoryTTL sets history ttl to expire inactive history streams. // Current Broker implementations only work with seconds resolution for TTL. HistoryTTL time.Duration // HistorySize sets history size limit to prevent infinite stream growth. HistorySize int // HistoryMetaTTL allows overriding default (set in Config.HistoryMetaTTL) // history meta information expiration time upon publish. HistoryMetaTTL time.Duration // ClientInfo to include into Publication. By default, no ClientInfo will be appended. ClientInfo *ClientInfo // Tags to set Publication.Tags. Tags map[string]string // IdempotencyKey is an optional key for idempotent publish. Broker implementation // may cache these keys for some time to prevent duplicate publications. In this case // the returned result is the same as from the previous publication with the same key. IdempotencyKey string // IdempotentResultTTL sets the time of expiration for results of idempotent publications // (publications with idempotency key provided). Memory and Redis engines implement this TTL // with second precision, so don't set something less than one second here. By default, // Centrifuge uses 5 minutes as idempotent result TTL. IdempotentResultTTL time.Duration // UseDelta enables using delta encoding for the publication. UseDelta bool }
PublishOptions define some fields to alter behaviour of Publish operation.
type PublishReply ¶
type PublishReply struct { // Options to control publication. Options PublishOptions // Result if set will tell Centrifuge that message already published to // channel by handler code. In this case Centrifuge won't try to publish // into channel again after handler returned PublishReply. This can be // useful if you need to know new Publication offset in your code, or you // want to make sure message successfully published to Broker on server // side (otherwise only client will get an error). Result *PublishResult }
PublishReply contains fields determining the result on publish.
type PublishResult ¶ added in v0.8.0
type PublishResult struct { StreamPosition FromCache bool }
PublishResult returned from Publish operation.
type RPCCallback ¶ added in v0.13.0
RPCCallback should be called as soon as handler decides what to do with connection RPCEvent.
type RPCEvent ¶
type RPCEvent struct { // Method is an optional string that contains RPC method name client wants to call. // This is an optional field, by default clients send RPC without any method set. Method string // Data contains RPC untouched payload. Data []byte }
RPCEvent contains fields related to rpc request.
type RPCHandler ¶
type RPCHandler func(RPCEvent, RPCCallback)
RPCHandler must handle incoming command from client.
type RPCReply ¶
type RPCReply struct { // Data to return in RPC reply to client. Data []byte }
RPCReply contains fields determining the reaction on rpc request.
type RecoveryMode ¶ added in v0.33.0
type RecoveryMode uint8
const ( RecoveryModeStream RecoveryMode = 0 RecoveryModeCache RecoveryMode = 1 )
type RedisBroker ¶ added in v0.16.0
type RedisBroker struct {
// contains filtered or unexported fields
}
RedisBroker uses Redis to implement Broker functionality. This broker allows scaling Centrifuge-based server to many instances and load balance client connections between them. Centrifuge nodes will be connected over Redis PUB/SUB. RedisBroker supports standalone Redis, Redis in master-replica setup with Sentinel, Redis Cluster. Also, it supports client-side consistent sharding between isolated Redis setups. By default, Redis >= 5 required (due to the fact RedisBroker uses STREAM data structure to keep publication history for a channel).
func NewRedisBroker ¶ added in v0.16.0
func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error)
NewRedisBroker initializes Redis Broker.
func (*RedisBroker) History ¶ added in v0.16.0
func (b *RedisBroker) History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error)
History - see Broker.History.
func (*RedisBroker) Publish ¶ added in v0.16.0
func (b *RedisBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, bool, error)
Publish - see Broker.Publish.
func (*RedisBroker) PublishControl ¶ added in v0.16.0
func (b *RedisBroker) PublishControl(data []byte, nodeID string, _ string) error
PublishControl - see Broker.PublishControl.
func (*RedisBroker) PublishJoin ¶ added in v0.16.0
func (b *RedisBroker) PublishJoin(ch string, info *ClientInfo) error
PublishJoin - see Broker.PublishJoin.
func (*RedisBroker) PublishLeave ¶ added in v0.16.0
func (b *RedisBroker) PublishLeave(ch string, info *ClientInfo) error
PublishLeave - see Broker.PublishLeave.
func (*RedisBroker) RemoveHistory ¶ added in v0.16.0
func (b *RedisBroker) RemoveHistory(ch string) error
RemoveHistory - see Broker.RemoveHistory.
func (*RedisBroker) Run ¶ added in v0.16.0
func (b *RedisBroker) Run(h BrokerEventHandler) error
Run – see Broker.Run.
func (*RedisBroker) Subscribe ¶ added in v0.16.0
func (b *RedisBroker) Subscribe(ch string) error
Subscribe - see Broker.Subscribe.
func (*RedisBroker) Unsubscribe ¶ added in v0.16.0
func (b *RedisBroker) Unsubscribe(ch string) error
Unsubscribe - see Broker.Unsubscribe.
type RedisBrokerConfig ¶ added in v0.16.0
type RedisBrokerConfig struct { // Prefix to use before every channel name and key in Redis. By default, // RedisBroker will use prefix "centrifuge". Prefix string // Shards is a slice of RedisShard to use. At least one shard must be provided. // Data will be consistently sharded by channel over provided Redis shards. Shards []*RedisShard // UseLists allows enabling usage of Redis LIST instead of STREAM data // structure to keep history. LIST support exist mostly for backward // compatibility since STREAM seems superior. If you have a use case // where you need to turn on this option in new setup - please share, // otherwise LIST support can be removed at some point in the future. // Iteration over history in reversed order not supported with lists. UseLists bool // Subscribe on replica Redis nodes. This only works for Redis Cluster // and Sentinel setups and requires replica client to be initialized in // each RedisShard using RedisShardConfig.ReplicaClientEnabled. SubscribeOnReplica bool // SkipPubSub enables mode when Redis broker only saves history, without // publishing to channels and using PUB/SUB. SkipPubSub bool // Name of broker, for observability purposes – i.e. becomes part of metrics/logs. // By default, empty string is used. Name string // NumShardedPubSubPartitions when greater than zero allows turning on a mode in which // broker will use Redis Cluster with sharded PUB/SUB feature available in // Redis >= 7: https://redis.io/docs/manual/pubsub/#sharded-pubsub // // To achieve sharded PUB/SUB efficiency RedisBroker reduces 16384 Redis Cluster // slots to the NumShardedPubSubPartitions value and starts a separate PUB/SUB for each // partition. This is necessary because in Centrifuge case one node can work with // thousands of different channels – and we can't afford running a separate // PUB/SUB connection for each of 16384 possible slots. We re-use partition // connection for many channels and make sure that all channels in the partition // point to the same Redis Cluster slot. // // By default, sharded PUB/SUB is not used in Redis Cluster case - Centrifuge uses // globally distributed PUBLISH commands in Redis Cluster where each publish is // distributed to all nodes in Redis Cluster. // // Note (!), that turning on NumShardedPubSubPartitions will cause Centrifuge to generate // different key names for history and different Redis channel names than in the base // Redis Cluster mode due to reasons outlined above. NumShardedPubSubPartitions int // contains filtered or unexported fields }
RedisBrokerConfig is a config for Broker.
type RedisPresenceManager ¶ added in v0.16.0
type RedisPresenceManager struct {
// contains filtered or unexported fields
}
RedisPresenceManager keeps presence in Redis thus allows scaling nodes.
func NewRedisPresenceManager ¶ added in v0.16.0
func NewRedisPresenceManager(n *Node, config RedisPresenceManagerConfig) (*RedisPresenceManager, error)
NewRedisPresenceManager creates new RedisPresenceManager.
func (*RedisPresenceManager) AddPresence ¶ added in v0.16.0
func (m *RedisPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error
AddPresence - see PresenceManager interface description.
func (*RedisPresenceManager) Close ¶ added in v0.26.0
func (m *RedisPresenceManager) Close(_ context.Context) error
func (*RedisPresenceManager) Presence ¶ added in v0.16.0
func (m *RedisPresenceManager) Presence(ch string) (map[string]*ClientInfo, error)
Presence - see PresenceManager interface description.
func (*RedisPresenceManager) PresenceStats ¶ added in v0.16.0
func (m *RedisPresenceManager) PresenceStats(ch string) (PresenceStats, error)
PresenceStats - see PresenceManager interface description.
func (*RedisPresenceManager) RemovePresence ¶ added in v0.16.0
func (m *RedisPresenceManager) RemovePresence(ch string, clientID string, userID string) error
RemovePresence - see PresenceManager interface description.
type RedisPresenceManagerConfig ¶ added in v0.16.0
type RedisPresenceManagerConfig struct { // Prefix to use before every channel name and key in Redis. By default, // "centrifuge" prefix will be used. Prefix string // PresenceTTL is an interval how long to consider presence info // valid after receiving presence update. This allows to automatically // clean up unnecessary presence entries after TTL passed. Zero value // means 60 seconds. PresenceTTL time.Duration // Shards is a slice of RedisShard to use. At least one shard must be provided. // Data will be consistently sharded by channel over provided Redis shards. Shards []*RedisShard // EnableUserMapping when returns true tells RedisPresenceManager to additionally store // user to num client connections hash map and sorted set with unique users in Redis. // This increases Redis memory usage since additional structures are used, but provides // a way to optimize presence stats retrieving as we can calculate stats quickly on // Redis side instead of loading the entire presence information. By default, user mapping // is not maintained. EnableUserMapping func(channel string) bool // UseHashFieldTTL allows using HEXPIRE command to set TTL for hash field. It's only available // since Redis 7.4.0 thus disabled by default. Using hash field TTL can be useful to avoid // maintaining expiration index in ZSET – so both useful from the throughput and memory usage // perspective. UseHashFieldTTL bool // ReadFromReplica enables reading presence information from replica Redis servers. // This only works in Redis Cluster and Sentinel setups and requires replica client // to be initialized in each RedisShard using RedisShardConfig.ReplicaClientEnabled. ReadFromReplica bool }
RedisPresenceManagerConfig is a config for RedisPresenceManager.
type RedisShard ¶ added in v0.16.0
type RedisShard struct {
// contains filtered or unexported fields
}
func NewRedisShard ¶ added in v0.16.0
func NewRedisShard(_ *Node, conf RedisShardConfig) (*RedisShard, error)
NewRedisShard initializes new Redis shard.
func (*RedisShard) Close ¶ added in v0.26.0
func (s *RedisShard) Close()
type RedisShardConfig ¶
type RedisShardConfig struct { // Address is a Redis server connection address. Address can be: // - host:port // - tcp://[[[user]:password]@]host:port[/db][?option1=value1&optionN=valueN] // - redis://[[[user]:password]@]host:port[/db][?option1=value1&optionN=valueN] // - unix://[[[user]:password]@]path[?option1=value1&optionN=valueN] // It's also possible to use Address with redis+sentinel:// and redis+cluster:// // schemes when connecting to Redis Sentinel and Redis Cluster respectively. // Examples: // - redis+sentinel://[[[user]:password]@]host:port?sentinel_master_name=mymaster // - redis+cluster://[[[user]:password]@]host:port[?addr=host2:port2&addr=host3:port3] // If you need to connect to Redis Cluster then you need to provide ClusterAddresses // or must use redis+cluster:// scheme in Address. // If you need to connect to Redis Sentinel then you need to provide SentinelAddresses // or must use redis+sentinel:// scheme in Address. // I.e. Centrifuge requires you to explicitly specify the type of Redis setup you want // to connect to. Address string // ClusterAddresses is a slice of seed cluster addresses to connect to. // Each address should be in form of host:port. If ClusterAddresses set then // RedisShardConfig.Address not used at all. ClusterAddresses []string // SentinelAddresses is a slice of Sentinel addresses. Each address should // be in form of host:port. If set then Redis address will be automatically // discovered from Sentinel. For Sentinel the name of the master instance // Sentinel monitors (SentinelMasterName) must be provided. If SentinelAddresses // set then RedisShardConfig.Address not used at all. SentinelAddresses []string // SentinelMasterName is a name of Redis instance master Sentinel monitors. SentinelMasterName string // SentinelUser is a user for Sentinel ACL-based auth. SentinelUser string // SentinelPassword is a password for Sentinel. Works with Sentinel >= 5.0.1. SentinelPassword string // SentinelClientName is a client name for established connections to Sentinel. SentinelClientName string // SentinelTLSConfig is a TLS configuration for Sentinel connections. SentinelTLSConfig *tls.Config // DB is Redis database number. If not set then database 0 used. // Does not make sense in Redis Cluster case. DB int // User is a username for Redis ACL-based auth. User string // Password is password to use when connecting to Redis. If zero then password not used. Password string // ClientName for established connections with Redis. See https://redis.io/commands/client-setname/ ClientName string // TLSConfig contains connection TLS configuration. TLSConfig *tls.Config // ConnectTimeout is a timeout on connect operation. // By default, 1 second is used. ConnectTimeout time.Duration // IOTimeout is a timeout on Redis connection operations. This is used as a write deadline // for connection, also Redis client we use internally periodically (once in a second) PINGs // Redis with this timeout for PING operation to find out stale/broken/blocked connections. // By default, 4 seconds is used. IOTimeout time.Duration // ForceRESP2 if set to true forces using RESP2 protocol for communicating with Redis. // By default, Redis client tries to detect supported Redis protocol automatically // trying RESP3 first. ForceRESP2 bool // ReplicaClientEnabled once set to true will initialize replica client for this shard. // Replica client can then be used for read-only operations from replica nodes in Redis // Cluster or Redis Sentinel setups (single Redis is not allowed). Replica client will // be initialized with the same options as the main client but with ReplicaOnly option // set to true. ReplicaClientEnabled bool }
RedisShardConfig contains Redis connection options.
type RefreshCallback ¶ added in v0.13.0
type RefreshCallback func(RefreshReply, error)
RefreshCallback should be called as soon as handler decides what to do with connection refresh event.
type RefreshEvent ¶
type RefreshEvent struct { // ClientSideRefresh is true for refresh initiated by client-side refresh workflow. ClientSideRefresh bool // Token will only be set in case of using client-side refresh mechanism. Token string }
RefreshEvent contains fields related to refresh event.
type RefreshHandler ¶
type RefreshHandler func(RefreshEvent, RefreshCallback)
RefreshHandler called when it's time to validate client connection and update its expiration time if it's still actual.
Centrifuge library supports two ways of refreshing connection: client-side and server-side.
The default mechanism is server-side, this means that as soon refresh handler set and connection expiration time happens (by timer) – refresh handler will be called.
If ClientSideRefresh in ConnectReply inside ConnectingHandler set to true then library uses client-side refresh mechanism. In this case library relies on Refresh commands sent from client periodically to refresh connection. Refresh command contains updated connection token.
type RefreshOption ¶ added in v0.18.0
type RefreshOption func(options *RefreshOptions)
RefreshOption is a type to represent various Refresh options.
func WithRefreshClient ¶ added in v0.18.0
func WithRefreshClient(clientID string) RefreshOption
WithRefreshClient to limit refresh only for specified client ID.
func WithRefreshExpireAt ¶ added in v0.18.0
func WithRefreshExpireAt(expireAt int64) RefreshOption
WithRefreshExpireAt to set unix seconds in the future when connection should expire. Zero value means no expiration.
func WithRefreshExpired ¶ added in v0.18.0
func WithRefreshExpired(expired bool) RefreshOption
WithRefreshExpired to set expired flag - connection will be closed with DisconnectExpired.
func WithRefreshInfo ¶ added in v0.18.0
func WithRefreshInfo(info []byte) RefreshOption
WithRefreshInfo to override connection info.
func WithRefreshSession ¶ added in v0.21.1
func WithRefreshSession(sessionID string) RefreshOption
WithRefreshSession to limit refresh only for specified session ID.
type RefreshOptions ¶ added in v0.18.0
type RefreshOptions struct { // Expired can close connection with expired reason. Expired bool // ExpireAt defines time in future when subscription should expire, // zero value means no expiration. ExpireAt int64 // Info defines custom channel information, zero value means no channel information. Info []byte // contains filtered or unexported fields }
RefreshOptions ...
type RefreshReply ¶
type RefreshReply struct { // Expired tells Centrifuge that connection expired. In this case connection will be // closed with DisconnectExpired. Expired bool // ExpireAt defines time in future when connection should expire, // zero value means no expiration. ExpireAt int64 // Info allows modifying connection information, // zero value means no modification of current connection Info. Info []byte }
RefreshReply contains fields determining the reaction on refresh event.
type RegistererGatherer ¶ added in v0.34.0
type RegistererGatherer interface { prometheus.Registerer prometheus.Gatherer }
RegistererGatherer defines an interface that combines Registerer and Gatherer from Prometheus. Prometheus Registry implements both interfaces.
type SSEConfig ¶ added in v0.23.0
type SSEConfig struct { PingPongConfig // MaxRequestBodySize limits initial request body size (when SSE starts with POST). MaxRequestBodySize int }
SSEConfig represents config for SSEHandler.
type SSEHandler ¶ added in v0.23.0
type SSEHandler struct {
// contains filtered or unexported fields
}
SSEHandler handles WebSocket client connections. WebSocket protocol is a bidirectional connection between a client and a server for low-latency communication.
func NewSSEHandler ¶ added in v0.23.0
func NewSSEHandler(node *Node, config SSEConfig) *SSEHandler
NewSSEHandler creates new SSEHandler.
func (*SSEHandler) ServeHTTP ¶ added in v0.23.0
func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
type StateSnapshotHandler ¶ added in v0.23.0
StateSnapshotHandler must return a copy of current client's internal state. Returning a copy is important to avoid data races.
type StreamPosition ¶ added in v0.8.0
type StreamPosition struct { // Offset defines publication incremental offset inside a stream. Offset uint64 // Epoch allows handling situations when storage // lost stream entirely for some reason (expired or lost after restart) and we // want to track this fact to prevent successful recovery from another stream. // I.e. for example we have a stream [1, 2, 3], then it's lost and new stream // contains [1, 2, 3, 4], client that recovers from position 3 will only receive // publication 4 missing 1, 2, 3 from new stream. With epoch, we can tell client // that correct recovery is not possible. Epoch string }
StreamPosition contains fields to describe position in stream. At moment this is used for automatic recovery mechanics. More info about stream recovery in Centrifugo docs: https://centrifugal.dev/docs/server/history_and_recovery.
type SubRefreshCallback ¶ added in v0.13.0
type SubRefreshCallback func(SubRefreshReply, error)
SubRefreshCallback should be called as soon as handler decides what to do with connection SubRefreshEvent.
type SubRefreshEvent ¶
type SubRefreshEvent struct { // ClientSideRefresh is true for refresh initiated by client-side subscription // refresh workflow. ClientSideRefresh bool // Channel to which SubRefreshEvent belongs to. Channel string // Token will only be set in case of using client-side subscription refresh mechanism. Token string }
SubRefreshEvent contains fields related to subscription refresh event.
type SubRefreshHandler ¶
type SubRefreshHandler func(SubRefreshEvent, SubRefreshCallback)
SubRefreshHandler called when it's time to validate client subscription to channel and update it's state if needed.
If ClientSideRefresh in SubscribeReply inside SubscribeHandler set to true then library uses client-side subscription refresh mechanism. In this case library relies on SubRefresh commands sent from client periodically to refresh subscription. SubRefresh command contains updated subscription token.
type SubRefreshReply ¶
type SubRefreshReply struct { // Expired tells Centrifuge that subscription expired. In this case connection will be // closed with DisconnectExpired. Expired bool // ExpireAt is a new Unix time of expiration. Zero value means no expiration. ExpireAt int64 // Info is a new channel-scope info. Zero value means do not change previous one. Info []byte }
SubRefreshReply contains fields determining the reaction on subscription refresh event.
type SubscribeCallback ¶ added in v0.13.0
type SubscribeCallback func(SubscribeReply, error)
SubscribeCallback should be called as soon as handler decides what to do with connection subscribe event.
type SubscribeEvent ¶
type SubscribeEvent struct { // Channel client wants to subscribe to. Channel string // Token will only be set for token channels. This is a task of application // to check that subscription to a channel has valid token. Token string // Data received from client as part of Subscribe Command. Data []byte // Positioned is true when Client wants to create subscription with positioned property. Positioned bool // Recoverable is true when Client wants to create subscription with recoverable property. Recoverable bool // JoinLeave is true when Client wants to receive join/leave messages. JoinLeave bool }
SubscribeEvent contains fields related to subscribe event.
type SubscribeHandler ¶
type SubscribeHandler func(SubscribeEvent, SubscribeCallback)
SubscribeHandler called when client wants to subscribe on channel.
type SubscribeOption ¶ added in v0.16.0
type SubscribeOption func(*SubscribeOptions)
SubscribeOption is a type to represent various Subscribe options.
func WithChannelInfo ¶ added in v0.16.0
func WithChannelInfo(chanInfo []byte) SubscribeOption
WithChannelInfo ...
func WithEmitJoinLeave ¶ added in v0.24.0
func WithEmitJoinLeave(enabled bool) SubscribeOption
WithEmitJoinLeave ...
func WithEmitPresence ¶ added in v0.24.0
func WithEmitPresence(enabled bool) SubscribeOption
WithEmitPresence ...
func WithExpireAt ¶ added in v0.16.0
func WithExpireAt(expireAt int64) SubscribeOption
WithExpireAt allows setting ExpireAt field.
func WithPositioning ¶ added in v0.24.0
func WithPositioning(enabled bool) SubscribeOption
WithPositioning ...
func WithPushJoinLeave ¶ added in v0.24.0
func WithPushJoinLeave(enabled bool) SubscribeOption
WithPushJoinLeave ...
func WithRecoverSince ¶ added in v0.18.0
func WithRecoverSince(since *StreamPosition) SubscribeOption
WithRecoverSince allows setting SubscribeOptions.RecoverFrom.
func WithRecovery ¶ added in v0.24.0
func WithRecovery(enabled bool) SubscribeOption
WithRecovery ...
func WithRecoveryMode ¶ added in v0.33.0
func WithRecoveryMode(mode RecoveryMode) SubscribeOption
WithRecoveryMode ...
func WithSubscribeClient ¶ added in v0.16.0
func WithSubscribeClient(clientID string) SubscribeOption
WithSubscribeClient allows setting client ID that should be subscribed. This option not used when Client.Subscribe called.
func WithSubscribeData ¶ added in v0.16.0
func WithSubscribeData(data []byte) SubscribeOption
WithSubscribeData allows setting custom data to send with subscribe push.
func WithSubscribeHistoryMetaTTL ¶ added in v0.29.0
func WithSubscribeHistoryMetaTTL(metaTTL time.Duration) SubscribeOption
WithSubscribeHistoryMetaTTL allows setting SubscribeOptions.HistoryMetaTTL.
func WithSubscribeSession ¶ added in v0.21.1
func WithSubscribeSession(sessionID string) SubscribeOption
WithSubscribeSession allows setting session ID that should be subscribed. This option not used when Client.Subscribe called.
func WithSubscribeSource ¶ added in v0.26.0
func WithSubscribeSource(source uint8) SubscribeOption
WithSubscribeSource allows setting SubscribeOptions.Source.
type SubscribeOptions ¶ added in v0.13.0
type SubscribeOptions struct { // ExpireAt defines time in future when subscription should expire, // zero value means no expiration. ExpireAt int64 // ChannelInfo defines custom channel information, zero value means no channel information. ChannelInfo []byte // EmitPresence turns on participating in channel presence - i.e. client // subscription will emit presence updates to PresenceManager and will be visible // in a channel presence result. EmitPresence bool // EmitJoinLeave turns on emitting Join and Leave events from the subscribing client. // See also PushJoinLeave if you want current client to receive join/leave messages. EmitJoinLeave bool // PushJoinLeave turns on receiving channel Join and Leave events by the client. // Subscriptions which emit join/leave events should have EmitJoinLeave on. PushJoinLeave bool // When position is on client will additionally sync its position inside a stream // to prevent publication loss. The loss can happen due to at most once guarantees // of PUB/SUB model. Make sure you are enabling EnablePositioning in channels that // maintain Publication history stream. When EnablePositioning is on Centrifuge will // include StreamPosition information to subscribe response - for a client to be // able to manually track its position inside a stream. EnablePositioning bool // EnableRecovery turns on automatic recovery for a channel. In this case // client will try to recover missed messages upon resubscribe to a channel // after reconnect to a server. This option also enables client position // tracking inside a stream (i.e. enabling EnableRecovery will automatically // enable EnablePositioning option) to prevent occasional publication loss. // Make sure you are using EnableRecovery in channels that maintain Publication // history stream. EnableRecovery bool // RecoveryMode is by default RecoveryModeStream, but can be also RecoveryModeCache. RecoveryMode RecoveryMode // Data to send to a client with Subscribe Push. Data []byte // RecoverSince will try to subscribe a client and recover from a certain StreamPosition. RecoverSince *StreamPosition // HistoryMetaTTL allows to override default (set in Config.HistoryMetaTTL) history // meta information expiration time. HistoryMetaTTL time.Duration // AllowedDeltaTypes is a whitelist of DeltaType subscribers can negotiate. At this point Centrifuge // only supports DeltaTypeFossil. If zero value – clients won't be able to negotiate delta encoding // within a channel and will receive full data in publications. // Delta encoding is an EXPERIMENTAL feature and may be changed. AllowedDeltaTypes []DeltaType // Source is a way to mark the source of Subscription - i.e. where it comes from. May be useful // for inspection of a connection during its lifetime. Source uint8 // contains filtered or unexported fields }
SubscribeOptions define per-subscription options.
type SubscribeReply ¶
type SubscribeReply struct { // Options to control subscription. Options SubscribeOptions // ClientSideRefresh tells library to use client-side refresh logic: i.e. send // SubRefresh commands with new Subscription Token. If not set then server-side // SubRefresh handler will be used. ClientSideRefresh bool // SubscriptionReady channel if provided will be closed as soon as Centrifuge // written subscribe reply to the connection, so it's possible to start writing // publications into a channel using experimental Client.WritePublication method. // In usual flow you don't need to provide this channel at all. // This is EXPERIMENTAL and may be removed in the future. SubscriptionReady chan struct{} }
SubscribeReply contains fields determining the reaction on subscribe event.
type SubscribeRequest ¶ added in v0.16.0
type SubscribeRequest struct { // Recover enables publication recovery for a channel. Recover bool // Epoch last seen by a client. Epoch string // Offset last seen by a client. Offset uint64 }
SubscribeRequest contains state of subscription to a channel.
type SurveyCallback ¶ added in v0.15.0
type SurveyCallback func(SurveyReply)
SurveyCallback should be called with SurveyReply as soon as survey completed.
type SurveyEvent ¶ added in v0.15.0
SurveyEvent with Op and Data of survey.
type SurveyHandler ¶ added in v0.15.0
type SurveyHandler func(SurveyEvent, SurveyCallback)
SurveyHandler allows setting survey handler function.
type SurveyReply ¶ added in v0.15.0
SurveyReply contains survey reply fields.
type SurveyResult ¶ added in v0.15.0
SurveyResult from node.
type Transport ¶
type Transport interface { TransportInfo // Write should write single push data into a connection. Every byte slice // here is a single Reply (or Push for unidirectional transport) encoded // according transport ProtocolType. Write([]byte) error // WriteMany should write data into a connection. Every byte slice here is a // single Reply (or Push for unidirectional transport) encoded according // transport ProtocolType. // The reason why we have both Write and WriteMany here is to have a path // without extra allocations for massive broadcasts (since variadic args cause // allocation). WriteMany(...[]byte) error // Close must close transport. Transport implementation can optionally // handle Disconnect passed here. For example builtin WebSocket transport // sends Disconnect as part of websocket.CloseMessage. Close(Disconnect) error }
Transport abstracts a connection transport between server and client. It does not contain Read method as reading can be handled by connection handler code (for example by WebsocketHandler.ServeHTTP).
type TransportInfo ¶
type TransportInfo interface { // Name returns a name of transport. Name() string // Protocol returns an underlying transport protocol type used by transport. // JSON or Protobuf protocol types are supported by Centrifuge. Message encoding // happens of Client level. Protocol() ProtocolType // ProtocolVersion returns protocol version used by transport. ProtocolVersion() ProtocolVersion // Unidirectional returns whether transport is unidirectional. For // unidirectional transports Client writes Push protobuf messages // without additional wrapping pushes into Reply type. Unidirectional() bool // Emulation must return true for transport that uses Centrifuge emulation layer. // See EmulationHandler for more details. Emulation() bool // DisabledPushFlags returns a disabled push flags for specific transport. // For example this allows to disable sending Disconnect push in case of // bidirectional WebSocket implementation since disconnect data sent inside // Close frame. DisabledPushFlags() uint64 // PingPongConfig returns application-level server-to-client ping // configuration. PingPongConfig() PingPongConfig }
TransportInfo has read-only transport description methods. Some of these methods can modify the behaviour of Client.
type TransportWriteEvent ¶ added in v0.18.0
type TransportWriteEvent struct { // Data represents single Centrifuge protocol message which is going to be sent // into the connection. For unidirectional transports this is an encoded protocol.Push // type, for bidirectional transports this is an encoded protocol.Reply type. Data []byte // Channel will be set if TransportWriteEvent relates to some channel. Channel string // FrameType tells what is being sent inside Data. FrameType protocol.FrameType }
TransportWriteEvent called just before sending data into the client connection. The event is triggered from inside each client's message queue consumer – so it should not directly affect Hub broadcast latencies.
type TransportWriteHandler ¶ added in v0.18.0
type TransportWriteHandler func(*Client, TransportWriteEvent) bool
TransportWriteHandler called just before writing data to the Transport. At this moment application can skip sending data to a client returning false from a handler. The main purpose of this handler is not a message filtering based on data content but rather tracing stuff.
type Unsubscribe ¶ added in v0.23.0
type Unsubscribe struct { // Code is unsubscribe code. Several unsubscribe codes already used by // a library, see for example UnsubscribeCodeClient, UnsubscribeCodeDisconnect, // UnsubscribeCodeServer, UnsubscribeCodeInsufficient. In theory, we can also // allow applications to set their custom unsubscribe codes in the future. Code uint32 `json:"code"` // Reason is a short description of unsubscribe code for humans. Suitable for // logs for better connection behavior observability. Reason string `json:"reason,omitempty"` }
Unsubscribe describes how client must be unsubscribed (or was unsubscribed) from a channel. Codes for unsubscribe advices going to client connections must be in range [2000, 2999]. Unsubscribe codes >= 2500 coming from server to client result into resubscribe attempt. Codes [0, 2099] and [2500, 2599] are reserved for Centrifuge library internal use and must not be used by applications to create custom Unsubscribe structs.
func (Unsubscribe) String ¶ added in v0.23.0
func (d Unsubscribe) String() string
String representation.
type UnsubscribeEvent ¶
type UnsubscribeEvent struct { // Channel client unsubscribed from. Channel string // ServerSide set to true for server-side subscription unsubscribe events. ServerSide bool // Unsubscribe identifies the source of unsubscribe (i.e. why unsubscribed event happened). Unsubscribe // Disconnect can be additionally set to identify the reason of disconnect when Unsubscribe.Code // is UnsubscribeCodeDisconnect - i.e. when unsubscribe caused by a client disconnection process. // Otherwise, it's nil. Disconnect *Disconnect }
UnsubscribeEvent contains fields related to unsubscribe event.
type UnsubscribeHandler ¶
type UnsubscribeHandler func(UnsubscribeEvent)
UnsubscribeHandler called when client unsubscribed from channel.
type UnsubscribeOption ¶ added in v0.5.0
type UnsubscribeOption func(options *UnsubscribeOptions)
UnsubscribeOption is a type to represent various Unsubscribe options.
func WithCustomUnsubscribe ¶ added in v0.23.0
func WithCustomUnsubscribe(unsubscribe Unsubscribe) UnsubscribeOption
WithCustomUnsubscribe allows setting custom Unsubscribe.
func WithUnsubscribeClient ¶ added in v0.16.0
func WithUnsubscribeClient(clientID string) UnsubscribeOption
WithUnsubscribeClient allows setting client ID that should be unsubscribed. This option not used when Client.Unsubscribe called.
func WithUnsubscribeSession ¶ added in v0.21.1
func WithUnsubscribeSession(sessionID string) UnsubscribeOption
WithUnsubscribeSession allows setting session ID that should be unsubscribed. This option not used when Client.Unsubscribe called.
type UnsubscribeOptions ¶ added in v0.5.0
type UnsubscribeOptions struct {
// contains filtered or unexported fields
}
UnsubscribeOptions ...
type WebsocketConfig ¶
type WebsocketConfig struct { // CheckOrigin func to provide custom origin check logic. // nil means that sameHostOriginCheck function will be used which // expects Origin host to match request Host. CheckOrigin func(r *http.Request) bool // ReadBufferSize is a parameter that is used for raw websocket Upgrader. // If set to zero reasonable default value will be used. ReadBufferSize int // WriteBufferSize is a parameter that is used for raw websocket Upgrader. // If set to zero reasonable default value will be used. WriteBufferSize int // UseWriteBufferPool enables using buffer pool for writes. UseWriteBufferPool bool // MessageSizeLimit sets the maximum size in bytes of allowed message from client. // By default, 65536 bytes (64KB) will be used. MessageSizeLimit int // WriteTimeout is maximum time of write message operation. // Slow client will be disconnected. // By default, 1 * time.Second will be used. WriteTimeout time.Duration // Compression allows enabling websocket permessage-deflate // compression support for raw websocket connections. It does // not guarantee that compression will be used - i.e. it only // says that server will try to negotiate it with client. // Note: enabling compression may lead to performance degradation. Compression bool // CompressionLevel sets a level for websocket compression. // See possible value description at https://golang.org/pkg/compress/flate/#NewWriter CompressionLevel int // CompressionMinSize allows setting minimal limit in bytes for // message to use compression when writing it into client connection. // By default, it's 0 - i.e. all messages will be compressed when // WebsocketCompression enabled and compression negotiated with client. CompressionMinSize int // CompressionPreparedMessageCacheSize when greater than zero tells Centrifuge to use // prepared WebSocket messages for connections with compression. This generally introduces // overhead but at the same time may drastically reduce compression memory and CPU spikes // during broadcasts. See also BenchmarkWsBroadcastCompressionCache. // This option is EXPERIMENTAL, do not use in production. Contact maintainers if it // works well for your use case, and you want to enable it in production. CompressionPreparedMessageCacheSize int64 PingPongConfig }
WebsocketConfig represents config for WebsocketHandler.
type WebsocketHandler ¶
type WebsocketHandler struct {
// contains filtered or unexported fields
}
WebsocketHandler handles WebSocket client connections. WebSocket protocol is a bidirectional connection between a client and a server for low-latency communication.
func NewWebsocketHandler ¶
func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler
NewWebsocketHandler creates new WebsocketHandler.
func (*WebsocketHandler) ServeHTTP ¶
func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)
Source Files ¶
- broker.go
- broker_memory.go
- broker_redis.go
- channel_medium.go
- client.go
- client_experimental.go
- config.go
- credentials.go
- disconnect.go
- doc.go
- emulation.go
- errors.go
- events.go
- handler_http_stream.go
- handler_sse.go
- handler_websocket.go
- hub.go
- logging.go
- metrics.go
- node.go
- options.go
- presence.go
- presence_memory.go
- presence_redis.go
- redis_shard.go
- transport.go
- unsubscribe.go
- writer.go
Directories ¶
Path | Synopsis |
---|---|
_examples
module
|
|
internal
|
|
priority
Package priority provides priority queue.
|
Package priority provides priority queue. |
websocket
Package websocket implements the WebSocket protocol defined in RFC 6455.
|
Package websocket implements the WebSocket protocol defined in RFC 6455. |
websocket/examples/autobahn
Command server is a test server for the Autobahn WebSockets Test Suite.
|
Command server is a test server for the Autobahn WebSockets Test Suite. |