Documentation ¶
Overview ¶
Package centrifuge is a real-time messaging library that abstracts several bidirectional transports (Websocket, SockJS) and provides primitives to build real-time applications with Go. It's also used as core of Centrifugo server (https://github.com/centrifugal/centrifugo).
The API of this library is almost all goroutine-safe except cases where one-time operations like setting callback handlers performed. Library expects that code inside callbacks will not block.
Centrifuge library provides several features on top of plain Websocket implementation - read highlights in library README on Github – https://github.com/centrifugal/centrifuge.
Also check out examples in repo to see main library concepts in action.
Index ¶
- Constants
- Variables
- func LogLevelToString(l LogLevel) string
- func SetCredentials(ctx context.Context, cred *Credentials) context.Context
- type Broker
- type BrokerEventHandler
- type ChannelContext
- type ChannelNamespace
- type ChannelOptions
- type Client
- func (c *Client) Channels() map[string]ChannelContext
- func (c *Client) Close(disconnect *Disconnect) error
- func (c *Client) Handle(data []byte) bool
- func (c *Client) ID() string
- func (c *Client) On() *ClientEventHub
- func (c *Client) Send(data []byte) error
- func (c *Client) Subscribe(channel string) error
- func (c *Client) Transport() TransportInfo
- func (c *Client) Unsubscribe(ch string, opts ...UnsubscribeOption) error
- func (c *Client) UserID() string
- type ClientEventHub
- func (c *ClientEventHub) Disconnect(h DisconnectHandler)
- func (c *ClientEventHub) Message(h MessageHandler)
- func (c *ClientEventHub) Publish(h PublishHandler)
- func (c *ClientEventHub) RPC(h RPCHandler)
- func (c *ClientEventHub) SubRefresh(h SubRefreshHandler)
- func (c *ClientEventHub) Subscribe(h SubscribeHandler)
- func (c *ClientEventHub) Unsubscribe(h UnsubscribeHandler)
- type ClientInfo
- type Closer
- type Config
- type ConnectEvent
- type ConnectReply
- type ConnectedHandler
- type ConnectingHandler
- type Credentials
- type Disconnect
- type DisconnectEvent
- type DisconnectHandler
- type DisconnectOption
- type DisconnectOptions
- type DisconnectReply
- type EncodingType
- type Engine
- type Error
- type HistoryFilter
- type HistoryManager
- type HistoryOption
- type HistoryOptions
- type HistoryResult
- type Hub
- type Info
- type LogEntry
- type LogHandler
- type LogLevel
- type MemoryEngine
- func (e *MemoryEngine) AddHistory(ch string, pub *protocol.Publication, opts *ChannelOptions) (StreamPosition, bool, error)
- func (e *MemoryEngine) AddPresence(ch string, uid string, info *protocol.ClientInfo, _ time.Duration) error
- func (e *MemoryEngine) Channels() ([]string, error)
- func (e *MemoryEngine) History(ch string, filter HistoryFilter) ([]*protocol.Publication, StreamPosition, error)
- func (e *MemoryEngine) Presence(ch string) (map[string]*protocol.ClientInfo, error)
- func (e *MemoryEngine) PresenceStats(ch string) (PresenceStats, error)
- func (e *MemoryEngine) Publish(ch string, pub *protocol.Publication, _ *ChannelOptions) error
- func (e *MemoryEngine) PublishControl(data []byte) error
- func (e *MemoryEngine) PublishJoin(ch string, join *protocol.Join, _ *ChannelOptions) error
- func (e *MemoryEngine) PublishLeave(ch string, leave *protocol.Leave, _ *ChannelOptions) error
- func (e *MemoryEngine) RemoveHistory(ch string) error
- func (e *MemoryEngine) RemovePresence(ch string, uid string) error
- func (e *MemoryEngine) Run(h BrokerEventHandler) error
- func (e *MemoryEngine) Subscribe(_ string) error
- func (e *MemoryEngine) Unsubscribe(_ string) error
- type MemoryEngineConfig
- type MessageEvent
- type MessageHandler
- type MessageReply
- type Metrics
- type Node
- func (n *Node) ChannelOpts(ch string) (ChannelOptions, bool)
- func (n *Node) Channels() ([]string, error)
- func (n *Node) Config() Config
- func (n *Node) Disconnect(user string, opts ...DisconnectOption) error
- func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error)
- func (n *Node) Hub() *Hub
- func (n *Node) Info() (Info, error)
- func (n *Node) Log(entry LogEntry)
- func (n *Node) LogEnabled(level LogLevel) bool
- func (n *Node) NotifyShutdown() chan struct{}
- func (n *Node) On() NodeEventHub
- func (n *Node) PersonalChannel(user string) string
- func (n *Node) Presence(ch string) (map[string]*ClientInfo, error)
- func (n *Node) PresenceStats(ch string) (PresenceStats, error)
- func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (PublishResult, error)
- func (n *Node) Reload(c Config) error
- func (n *Node) RemoveHistory(ch string) error
- func (n *Node) Run() error
- func (n *Node) SetBroker(b Broker)
- func (n *Node) SetEngine(e Engine)
- func (n *Node) SetHistoryManager(m HistoryManager)
- func (n *Node) SetPresenceManager(m PresenceManager)
- func (n *Node) Shutdown(ctx context.Context) error
- func (n *Node) Unsubscribe(user string, ch string, opts ...UnsubscribeOption) error
- type NodeEventHub
- type NodeInfo
- type PresenceManager
- type PresenceStats
- type ProtocolType
- type Publication
- type PublishEvent
- type PublishHandler
- type PublishOption
- type PublishOptions
- type PublishReply
- type PublishResult
- type RPCEvent
- type RPCHandler
- type RPCReply
- type Raw
- type RedisEngine
- func (e *RedisEngine) AddHistory(ch string, pub *protocol.Publication, opts *ChannelOptions) (StreamPosition, bool, error)
- func (e *RedisEngine) AddPresence(ch string, uid string, info *protocol.ClientInfo, exp time.Duration) error
- func (e *RedisEngine) Channels() ([]string, error)
- func (e *RedisEngine) History(ch string, filter HistoryFilter) ([]*protocol.Publication, StreamPosition, error)
- func (e *RedisEngine) Presence(ch string) (map[string]*protocol.ClientInfo, error)
- func (e *RedisEngine) PresenceStats(ch string) (PresenceStats, error)
- func (e *RedisEngine) Publish(ch string, pub *protocol.Publication, opts *ChannelOptions) error
- func (e *RedisEngine) PublishControl(data []byte) error
- func (e *RedisEngine) PublishJoin(ch string, join *protocol.Join, opts *ChannelOptions) error
- func (e *RedisEngine) PublishLeave(ch string, leave *protocol.Leave, opts *ChannelOptions) error
- func (e *RedisEngine) RemoveHistory(ch string) error
- func (e *RedisEngine) RemovePresence(ch string, uid string) error
- func (e *RedisEngine) Run(h BrokerEventHandler) error
- func (e *RedisEngine) Subscribe(ch string) error
- func (e *RedisEngine) Unsubscribe(ch string) error
- type RedisEngineConfig
- type RedisShardConfig
- type RefreshEvent
- type RefreshHandler
- type RefreshReply
- type SockjsConfig
- type SockjsHandler
- type StreamPosition
- type SubRefreshEvent
- type SubRefreshHandler
- type SubRefreshReply
- type SubscribeEvent
- type SubscribeHandler
- type SubscribeReply
- type Transport
- type TransportInfo
- type UnsubscribeEvent
- type UnsubscribeHandler
- type UnsubscribeOption
- type UnsubscribeOptions
- type UnsubscribeReply
- type WebsocketConfig
- type WebsocketHandler
Constants ¶
const ( DefaultWebsocketPingInterval = 25 * time.Second DefaultWebsocketWriteTimeout = 1 * time.Second DefaultWebsocketMessageSizeLimit = 65536 // 64KB )
Defaults.
const ( // UseSeqGen enables using Seq and Gen fields instead of Offset. UseSeqGen uint64 = 1 << iota )
Variables ¶
var ( // DisconnectNormal is clean disconnect when client cleanly closed connection. DisconnectNormal = &Disconnect{ Code: 3000, Reason: "normal", Reconnect: true, } // DisconnectShutdown sent when node is going to shut down. DisconnectShutdown = &Disconnect{ Code: 3001, Reason: "shutdown", Reconnect: true, } // DisconnectInvalidToken sent when client came with invalid token. DisconnectInvalidToken = &Disconnect{ Code: 3002, Reason: "invalid token", Reconnect: false, } // DisconnectBadRequest sent when client uses malformed protocol // frames or wrong order of commands. DisconnectBadRequest = &Disconnect{ Code: 3003, Reason: "bad request", Reconnect: false, } // DisconnectServerError sent when internal error occurred on server. DisconnectServerError = &Disconnect{ Code: 3004, Reason: "internal server error", Reconnect: true, } // DisconnectExpired sent when client connection expired. DisconnectExpired = &Disconnect{ Code: 3005, Reason: "expired", Reconnect: true, } // DisconnectSubExpired sent when client subscription expired. DisconnectSubExpired = &Disconnect{ Code: 3006, Reason: "subscription expired", Reconnect: true, } // DisconnectStale sent to close connection that did not become // authenticated in configured interval after dialing. DisconnectStale = &Disconnect{ Code: 3007, Reason: "stale", Reconnect: false, } // DisconnectSlow sent when client can't read messages fast enough. DisconnectSlow = &Disconnect{ Code: 3008, Reason: "slow", Reconnect: true, } // DisconnectWriteError sent when an error occurred while writing to // client connection. DisconnectWriteError = &Disconnect{ Code: 3009, Reason: "write error", Reconnect: true, } // DisconnectInsufficientState sent when server detects wrong client // position in channel Publication stream. Disconnect allows client // to restore missed publications on reconnect. DisconnectInsufficientState = &Disconnect{ Code: 3010, Reason: "insufficient state", Reconnect: true, } // DisconnectForceReconnect sent when server forcely disconnects connection. DisconnectForceReconnect = &Disconnect{ Code: 3011, Reason: "force reconnect", Reconnect: true, } // DisconnectForceNoReconnect sent when server forcely disconnects connection // and asks it to not reconnect again. DisconnectForceNoReconnect = &Disconnect{ Code: 3012, Reason: "force disconnect", Reconnect: false, } )
Some predefined disconnect structures used by library internally. Though it's always possible to create Disconnect with any field values on the fly. Library users supposed to use codes in range 4000-4999 for custom disconnects.
var ( // ErrorInternal means server error, if returned this is a signal // that something went wrong with server itself and client most probably // not guilty. ErrorInternal = &Error{ Code: 100, Message: "internal server error", } ErrorUnauthorized = &Error{ Code: 101, Message: "unauthorized", } // ErrorNamespaceNotFound means that namespace in channel name does not exist. ErrorNamespaceNotFound = &Error{ Code: 102, Message: "namespace not found", } // ErrorPermissionDenied means that access to resource not allowed. ErrorPermissionDenied = &Error{ Code: 103, Message: "permission denied", } // ErrorMethodNotFound means that method sent in command does not exist. ErrorMethodNotFound = &Error{ Code: 104, Message: "method not found", } // ErrorAlreadySubscribed returned when client wants to subscribe on channel // it already subscribed to. ErrorAlreadySubscribed = &Error{ Code: 105, Message: "already subscribed", } // ErrorLimitExceeded says that some sort of limit exceeded, server logs should // give more detailed information. ErrorLimitExceeded = &Error{ Code: 106, Message: "limit exceeded", } // ErrorBadRequest says that server can not process received // data because it is malformed. ErrorBadRequest = &Error{ Code: 107, Message: "bad request", } // ErrorNotAvailable means that resource is not enabled. ErrorNotAvailable = &Error{ Code: 108, Message: "not available", } // ErrorTokenExpired ... ErrorTokenExpired = &Error{ Code: 109, Message: "token expired", } // ErrorExpired ... ErrorExpired = &Error{ Code: 110, Message: "expired", } )
Here we define well-known errors that can be used in client protocol replies. Library user can define own application specific errors. When define new custom error it is recommended to use error codes > 1000 assuming that codes in interval 0-999 reserved by Centrifuge.
var CompatibilityFlags uint64
CompatibilityFlags is a global set of legacy features we support for backwards compatibility.
Should be removed with v1 library release. TODO v1: remove.
var DefaultConfig = Config{ Name: "centrifuge", NodeInfoMetricsAggregateInterval: 60 * time.Second, ChannelMaxLength: 255, ChannelPrivatePrefix: "$", ChannelNamespaceBoundary: ":", ChannelUserBoundary: "#", ChannelUserSeparator: ",", ClientPresencePingInterval: 25 * time.Second, ClientPresenceExpireInterval: 60 * time.Second, ClientExpiredCloseDelay: 25 * time.Second, ClientExpiredSubCloseDelay: 25 * time.Second, ClientStaleCloseDelay: 25 * time.Second, ClientChannelPositionCheckDelay: 40 * time.Second, ClientQueueMaxSize: 10485760, ClientChannelLimit: 128, }
DefaultConfig is Config initialized with default values for all fields.
var ( // ErrNoChannelOptions returned when operation can't be performed because no // appropriate channel options were found for channel. ErrNoChannelOptions = errors.New("no channel options found") )
Functions ¶
func LogLevelToString ¶
LogLevelToString transforms Level to its string representation.
func SetCredentials ¶
func SetCredentials(ctx context.Context, cred *Credentials) context.Context
SetCredentials allows to set connection Credentials to context. Credentials set to context will be used by centrifuge library then to authenticate user.
Types ¶
type Broker ¶
type Broker interface { // Run called once on start when broker already set to node. At // this moment node is ready to process broker events. Run(BrokerEventHandler) error // Subscribe node on channel to listen all messages coming from channel. Subscribe(ch string) error // Unsubscribe node from channel to stop listening messages from it. Unsubscribe(ch string) error // Publish allows to send Publication Push into channel. Publications should // be delivered to all clients subscribed on this channel at moment on // any Centrifuge node (with at most once delivery guarantee). Publish(ch string, pub *protocol.Publication, opts *ChannelOptions) error // PublishJoin publishes Join Push message into channel. PublishJoin(ch string, join *protocol.Join, opts *ChannelOptions) error // PublishLeave publishes Leave Push message into channel. PublishLeave(ch string, leave *protocol.Leave, opts *ChannelOptions) error // PublishControl allows to send control command data to all running nodes. PublishControl(data []byte) error // Channels returns slice of currently active channels (with one or more // subscribers) on all running nodes. This is possible with Redis but can // be much harder in other PUB/SUB system. Anyway this information can only // be used for admin needs to better understand state of system. So it's not // a big problem if another Broker implementation won't support this method. Channels() ([]string, error) }
Broker is responsible for PUB/SUB mechanics.
type BrokerEventHandler ¶
type BrokerEventHandler interface { // HandlePublication to handle received Publications. HandlePublication(ch string, pub *protocol.Publication) error // HandleJoin to handle received Join messages. HandleJoin(ch string, join *protocol.Join) error // HandleLeave to handle received Leave messages. HandleLeave(ch string, leave *protocol.Leave) error // HandleControl to handle received control data. HandleControl(data []byte) error }
BrokerEventHandler can handle messages received from PUB/SUB system.
type ChannelContext ¶
type ChannelContext struct { Info []byte // contains filtered or unexported fields }
ChannelContext contains extra context for channel connection subscribed to.
type ChannelNamespace ¶
type ChannelNamespace struct { // Name is a unique namespace name. Name string `json:"name"` // Options for namespace determine channel options for channels // belonging to this namespace. ChannelOptions `mapstructure:",squash"` }
ChannelNamespace allows to create channels with different channel options.
type ChannelOptions ¶
type ChannelOptions struct { // ServerSide marks all channels in namespace as server side, when on then // all client subscribe requests to these channels will be rejected with // PermissionDenied error. ServerSide bool `mapstructure:"server_side" json:"server_side"` // Publish enables possibility for clients to publish messages into channels. // Once enabled client can publish into channel and that publication will be // broadcasted to all current channel subscribers. You can control publishing // on server-side setting On().Publish callback to client connection. Publish bool `json:"publish"` // SubscribeToPublish turns on an automatic check that client subscribed // on channel before allow it to publish into that channel. SubscribeToPublish bool `mapstructure:"subscribe_to_publish" json:"subscribe_to_publish"` // Anonymous enables anonymous access (with empty user ID) to channel. // In most situations your application works with authenticated users so // every user has its own unique user ID. But if you provide real-time // features for public access you may need unauthenticated access to channels. // Turn on this option and use empty string as user ID. Anonymous bool `json:"anonymous"` // JoinLeave turns on join/leave messages for channels. // When client subscribes on channel join message sent to all // clients in this channel. When client leaves channel (unsubscribes) // leave message sent. This option does not fit well for channels with // many subscribers because every subscribe/unsubscribe event results // into join/leave event broadcast to all other active subscribers. JoinLeave bool `mapstructure:"join_leave" json:"join_leave"` // Presence turns on presence information for channels. // Presence is a structure with clients currently subscribed on channel. Presence bool `json:"presence"` // PresenceDisableForClient prevents presence to be asked by clients. // In this case it's available only over server-side presence call. PresenceDisableForClient bool `mapstructure:"presence_disable_for_client" json:"presence_disable_for_client"` // HistorySize determines max amount of history messages for channel, // 0 means no history for channel. Centrifugo history has auxiliary // role – it can not replace your backend persistent storage. HistorySize int `mapstructure:"history_size" json:"history_size"` // HistoryLifetime determines time in seconds until expiration for // history messages. As Centrifuge-based server keeps history in memory // (for example in process memory or in Redis process memory) it's // important to remove old messages to prevent infinite memory grows. HistoryLifetime int `mapstructure:"history_lifetime" json:"history_lifetime"` // Recover enables recover mechanism for channels. This means that // server will try to recover missed messages for resubscribing // client. This option uses publications from history and must be used // with reasonable HistorySize and HistoryLifetime configuration. HistoryRecover bool `mapstructure:"history_recover" json:"history_recover"` // HistoryDisableForClient prevents history to be asked by clients. // In this case it's available only over server-side history call. // History recover mechanism if enabled will continue to work for // clients anyway. HistoryDisableForClient bool `mapstructure:"history_disable_for_client" json:"history_disable_for_client"` }
ChannelOptions represent channel specific configuration for namespace or global channel options if set on top level of configuration.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents client connection to server.
func (*Client) Channels ¶
func (c *Client) Channels() map[string]ChannelContext
Channels returns a map of channels client connection currently subscribed to.
func (*Client) Close ¶
func (c *Client) Close(disconnect *Disconnect) error
Close client connection with specific disconnect reason.
func (*Client) Handle ¶ added in v0.1.0
Handle raw data encoded with Centrifuge protocol. Not goroutine-safe.
func (*Client) On ¶
func (c *Client) On() *ClientEventHub
On returns ClientEventHub to set various event handlers to client.
func (*Client) Send ¶
Send data to client. This sends an asynchronous message – data will be just written to connection. on client side this message can be handled with Message handler.
func (*Client) Transport ¶
func (c *Client) Transport() TransportInfo
Transport returns transport details used by client connection.
func (*Client) Unsubscribe ¶
func (c *Client) Unsubscribe(ch string, opts ...UnsubscribeOption) error
Unsubscribe allows to unsubscribe client from channel.
type ClientEventHub ¶
type ClientEventHub struct {
// contains filtered or unexported fields
}
ClientEventHub allows to deal with client event handlers. All its methods are not goroutine-safe and supposed to be called once on client connect.
func (*ClientEventHub) Disconnect ¶
func (c *ClientEventHub) Disconnect(h DisconnectHandler)
Disconnect allows to set DisconnectHandler. DisconnectHandler called when client disconnected.
func (*ClientEventHub) Message ¶
func (c *ClientEventHub) Message(h MessageHandler)
Message allows to set MessageHandler. MessageHandler called when client sent asynchronous message.
func (*ClientEventHub) Publish ¶
func (c *ClientEventHub) Publish(h PublishHandler)
Publish allows to set PublishHandler. PublishHandler called when client publishes message into channel.
func (*ClientEventHub) RPC ¶
func (c *ClientEventHub) RPC(h RPCHandler)
RPC allows to set RPCHandler. RPCHandler will be executed on every incoming RPC call.
func (*ClientEventHub) SubRefresh ¶
func (c *ClientEventHub) SubRefresh(h SubRefreshHandler)
SubRefresh allows to set SubRefreshHandler. SubRefreshHandler called when it's time to refresh client subscription.
func (*ClientEventHub) Subscribe ¶
func (c *ClientEventHub) Subscribe(h SubscribeHandler)
Subscribe allows to set SubscribeHandler. SubscribeHandler called when client subscribes on channel.
func (*ClientEventHub) Unsubscribe ¶
func (c *ClientEventHub) Unsubscribe(h UnsubscribeHandler)
Unsubscribe allows to set UnsubscribeHandler. UnsubscribeHandler called when client unsubscribes from channel.
type ClientInfo ¶
type ClientInfo = protocol.ClientInfo
ClientInfo contains information about client connection.
type Closer ¶
type Closer interface { // Close when called should clean up used resources. Close(ctx context.Context) error }
Closer is an interface that Broker, HistoryManager and PresenceManager can optionally implement if they need to close any resources on Centrifuge node shutdown.
type Config ¶
type Config struct { // ChannelOptions embedded. ChannelOptions // Namespaces – list of namespaces for custom channel options. Namespaces []ChannelNamespace // Version of server – will be sent to client on connection establishment // phase in response to connect request. Version string // Name of this server node - must be unique, used as human readable // and meaningful node identifier. Name string // TokenHMACSecretKey is a secret key used to validate connection and subscription // tokens generated using HMAC. Zero value means that HMAC tokens won't be allowed. TokenHMACSecretKey string // UserPersonalChannelPrefix defines prefix to be added to user personal channel. UserPersonalChannelNamespace string // ChannelPrivatePrefix is a prefix in channel name which indicates that // channel is private. ChannelPrivatePrefix string // ChannelNamespaceBoundary is a string separator which must be put after // namespace part in channel name. ChannelNamespaceBoundary string // ChannelUserBoundary is a string separator which must be set before allowed // users part in channel name. ChannelUserBoundary string // ChannelUserSeparator separates allowed users in user part of channel name. ChannelUserSeparator string // TokenRSAPublicKey is a public key used to validate connection and subscription // tokens generated using RSA. Zero value means that RSA tokens won't be allowed. TokenRSAPublicKey *rsa.PublicKey // ClientPresencePingInterval is an interval how often connected clients // must update presence info. ClientPresencePingInterval time.Duration // ClientPresenceExpireInterval is an interval how long to consider // presence info valid after receiving presence ping. ClientPresenceExpireInterval time.Duration // ClientExpiredCloseDelay is an extra time given to client to // refresh its connection in the end of connection lifetime. ClientExpiredCloseDelay time.Duration // ClientExpiredSubCloseDelay is an extra time given to client to // refresh its expiring subscription in the end of subscription lifetime. ClientExpiredSubCloseDelay time.Duration // ClientStaleCloseDelay is a timeout after which connection will be // closed if still not authenticated (i.e. no valid connect command // received yet). ClientStaleCloseDelay time.Duration // ClientChannelPositionCheckDelay defines minimal time from previous // client position check in channel. If client does not pass check it will // be disconnected with DisconnectInsufficientState. ClientChannelPositionCheckDelay time.Duration // NodeInfoMetricsAggregateInterval sets interval for automatic metrics aggregation. // It's not very reasonable to have it less than one second. NodeInfoMetricsAggregateInterval time.Duration // LogLevel is a log level to use. By default nothing will be logged. LogLevel LogLevel // LogHandler is a handler func node will send logs to. LogHandler LogHandler // ClientQueueMaxSize is a maximum size of client's message queue in bytes. // After this queue size exceeded Centrifugo closes client's connection. ClientQueueMaxSize int // ClientChannelLimit sets upper limit of channels each client can subscribe to. ClientChannelLimit int // ClientUserConnectionLimit limits number of client connections from user with the // same ID. 0 - unlimited. ClientUserConnectionLimit int // ChannelMaxLength is a maximum length of channel name. ChannelMaxLength int // ClientInsecure turns on insecure mode for client connections - when it's // turned on then no authentication required at all when connecting to Centrifugo, // anonymous access and publish allowed for all channels, no connection expire // performed. This can be suitable for demonstration or personal usage. ClientInsecure bool // ClientAnonymous when set to true, allows connect requests without specifying // a token or setting Credentials in authentication middleware. The resulting // user will have empty string for user ID, meaning user can only subscribe // to anonymous channels. ClientAnonymous bool // UserSubscribeToPersonal enables automatic subscribing to personal channel by user. // Only users with user ID defined will subscribe to personal channels, anonymous // users are ignored. UserSubscribeToPersonal bool }
Config contains Node configuration options.
type ConnectEvent ¶
type ConnectEvent struct { // ClientID that was generated by library for client connection. ClientID string // Token received from client as part of Connect Command. Token string // Data received from client as part of Connect Command. Data []byte }
ConnectEvent contains fields related to connecting event.
type ConnectReply ¶
type ConnectReply struct { // Context allows to return modified context. Context context.Context // Error for connect command reply. Error *Error // Disconnect client. Disconnect *Disconnect // Credentials should be set if app wants to authenticate connection. // This field still optional as auth could be provided through HTTP middleware // or via JWT token. Credentials *Credentials // Data allows to set custom data in connect reply. Data []byte // Channels slice contains channels to subscribe connection to on server-side. Channels []string }
ConnectReply contains fields determining the reaction on auth event.
type ConnectedHandler ¶ added in v0.0.2
ConnectedHandler called when new client connects to server.
type ConnectingHandler ¶ added in v0.0.2
type ConnectingHandler func(context.Context, TransportInfo, ConnectEvent) ConnectReply
ConnectingHandler called when new client authenticates on server.
type Credentials ¶
type Credentials struct { // UserID tells library an ID of connecting user. UserID string // ExpireAt allows to set time in future when connection must be validated. // In this case OnRefresh callback must be set by application. ExpireAt int64 // Info contains additional information about connection. This will be // included untouched into Join/Leave messages, into Presence information, // also info becomes a part of published message if it was published from // client directly. In some cases having additional info can be an // overhead – but you are simply free to not use it. Info []byte }
Credentials allows to authenticate connection when set into context.
func GetCredentials ¶ added in v0.5.0
func GetCredentials(ctx context.Context) (*Credentials, bool)
GetCredentials allows to get previously set Credentials from context.
type Disconnect ¶
type Disconnect struct { // Code is disconnect code. Code int `json:"code,omitempty"` // Reason is a short description of disconnect. Reason string `json:"reason"` // Reconnect gives client an advice to reconnect after disconnect or not. Reconnect bool `json:"reconnect"` // contains filtered or unexported fields }
Disconnect allows to configure how client will be disconnected from server. The important note that Disconnect serialized to JSON must be less than 127 bytes due to WebSocket protocol limitations (because at moment we send Disconnect inside reason field of WebSocket close handshake). Note that due to performance reasons we cache Disconnect text representation for Close Frame on first send to client so changing field values inside existing Disconnect instance won't be reflected in WebSocket/Sockjs Close frames.
func (*Disconnect) CloseText ¶ added in v0.8.2
func (d *Disconnect) CloseText() string
CloseText allows to build disconnect advice sent inside close frame. At moment we don't encode Code here to not duplicate information since it is sent separately as Code of WebSocket/SockJS Close Frame.
func (*Disconnect) String ¶ added in v0.8.2
func (d *Disconnect) String() string
String representation.
type DisconnectEvent ¶
type DisconnectEvent struct { // Disconnect can optionally contain a custom disconnect object that // was sent from server to client with closing handshake. If this field // exists then client connection was closed from server. If this field // is nil then this means that client disconnected normally and connection // closing was initiated by client side. Disconnect *Disconnect }
DisconnectEvent contains fields related to disconnect event.
type DisconnectHandler ¶
type DisconnectHandler func(DisconnectEvent) DisconnectReply
DisconnectHandler called when client disconnects from server.
type DisconnectOption ¶ added in v0.8.0
type DisconnectOption func(options *DisconnectOptions)
DisconnectOption is a type to represent various Unsubscribe options.
func WithReconnect ¶ added in v0.8.0
func WithReconnect() DisconnectOption
WithReconnect allows to set Reconnect flag to true.
type DisconnectOptions ¶ added in v0.8.0
type DisconnectOptions struct { // Reconnect allows to set reconnect flag. Reconnect bool }
DisconnectOptions define some fields to alter behaviour of Disconnect operation.
type DisconnectReply ¶
type DisconnectReply struct{}
DisconnectReply contains fields determining the reaction on disconnect event.
type EncodingType ¶ added in v0.1.0
type EncodingType string
EncodingType represents client payload encoding format.
const ( // EncodingTypeJSON means JSON payload. EncodingTypeJSON EncodingType = "json" // EncodingTypeBinary means binary payload. EncodingTypeBinary EncodingType = "binary" )
type Engine ¶
type Engine interface { Broker HistoryManager PresenceManager }
Engine is responsible for PUB/SUB mechanics, channel history and presence information.
type HistoryFilter ¶
type HistoryFilter struct { // Since used to extract publications from stream since provided StreamPosition. Since *StreamPosition // Limit number of publications to return. // -1 means no limit - i.e. return all publications currently in stream. // 0 means that caller only interested in current stream top position so // Engine should not return any publications. Limit int }
HistoryFilter allows to filter history according to fields set.
type HistoryManager ¶
type HistoryManager interface { // History used to extract Publications from storage. // Publications returned according to HistoryFilter which allows // to set several filtering options. // StreamPosition returned describes current history stream top // offset and epoch. History(ch string, filter HistoryFilter) ([]*protocol.Publication, StreamPosition, error) // AddHistory adds Publication to channel history. Storage should // automatically maintain history size and lifetime according to // channel options if needed. // StreamPosition returned here describes current stream top offset // and epoch. // Second return value is a boolean flag which when true tells that // Publication already published to PUB/SUB system so node should // not additionally call Broker Publish method. This can be useful // for situations when HistoryManager can atomically save Publication // to history and publish it towards online subscribers (ex. over Lua // in Redis via single RTT). AddHistory(ch string, pub *protocol.Publication, opts *ChannelOptions) (StreamPosition, bool, error) // RemoveHistory removes history from channel. This is in general not // needed as history expires automatically (based on history_lifetime) // but sometimes can be useful for application logic. RemoveHistory(ch string) error }
HistoryManager is responsible for dealing with channel history management.
type HistoryOption ¶ added in v0.8.0
type HistoryOption func(options *HistoryOptions)
HistoryOption is a type to represent various History options.
func Since ¶ added in v0.8.0
func Since(sp StreamPosition) HistoryOption
Since allows to set Since option.
func WithLimit ¶ added in v0.8.0
func WithLimit(limit int) HistoryOption
WithLimit allows to set limit.
func WithNoLimit ¶ added in v0.8.0
func WithNoLimit() HistoryOption
WithNoLimit allows to not limit returned Publications amount. Should be used carefully inside large history streams.
type HistoryOptions ¶ added in v0.8.0
type HistoryOptions struct { // Since used to extract publications from stream since provided StreamPosition. Since *StreamPosition // Limit number of publications to return. // -1 means no limit - i.e. return all publications currently in stream. // 0 means that caller only interested in current stream top position so Engine // should not return any publications in result. // Positive integer does what it should. Limit int }
HistoryOptions define some fields to alter History method behaviour.
type HistoryResult ¶ added in v0.8.0
type HistoryResult struct { // StreamPosition embedded here describes current stream top offset and epoch. StreamPosition // Publications extracted from history storage according to HistoryFilter. Publications []*Publication }
HistoryResult contains Publications and current stream top StreamPosition.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub manages client connections.
func (*Hub) NumChannels ¶
NumChannels returns a total number of different channels.
func (*Hub) NumClients ¶
NumClients returns total number of client connections.
func (*Hub) NumSubscribers ¶
NumSubscribers returns number of current subscribers for a given channel.
type Info ¶
type Info struct {
Nodes []NodeInfo
}
Info contains information about all known server nodes.
type LogHandler ¶
type LogHandler func(LogEntry)
LogHandler handles log entries - i.e. writes into correct destination if necessary.
type LogLevel ¶
type LogLevel int
LogLevel describes the chosen log level.
const ( // LogLevelNone means no logging. LogLevelNone LogLevel = iota // LogLevelDebug turns on debug logs - its generally too much for production in normal // conditions but can help when developing and investigating problems in production. LogLevelDebug // LogLevelInfo is logs useful server information. This includes various information // about problems with client connections which is not Centrifugo errors but // in most situations malformed client behaviour. LogLevelInfo // LogLevelError level logs only server errors. This is logging that means non-working // Centrifugo and maybe effort from developers/administrators to make things // work again. LogLevelError )
type MemoryEngine ¶
type MemoryEngine struct {
// contains filtered or unexported fields
}
MemoryEngine is builtin default engine which allows to run Centrifuge-based server without any external broker or storage. All data managed inside process memory.
With this engine you can only run single Centrifuge node. If you need to scale you should consider using another engine implementation instead – for example Redis engine.
Running single node can be sufficient for many use cases especially when you need maximum performance and not too many online clients. Consider configuring your load balancer to have one backup Centrifuge node for HA in this case.
func NewMemoryEngine ¶
func NewMemoryEngine(n *Node, c MemoryEngineConfig) (*MemoryEngine, error)
NewMemoryEngine initializes Memory Engine.
func (*MemoryEngine) AddHistory ¶
func (e *MemoryEngine) AddHistory(ch string, pub *protocol.Publication, opts *ChannelOptions) (StreamPosition, bool, error)
AddHistory - see engine interface description.
func (*MemoryEngine) AddPresence ¶
func (e *MemoryEngine) AddPresence(ch string, uid string, info *protocol.ClientInfo, _ time.Duration) error
AddPresence - see engine interface description.
func (*MemoryEngine) Channels ¶
func (e *MemoryEngine) Channels() ([]string, error)
Channels - see engine interface description.
func (*MemoryEngine) History ¶
func (e *MemoryEngine) History(ch string, filter HistoryFilter) ([]*protocol.Publication, StreamPosition, error)
History - see engine interface description.
func (*MemoryEngine) Presence ¶
func (e *MemoryEngine) Presence(ch string) (map[string]*protocol.ClientInfo, error)
Presence - see engine interface description.
func (*MemoryEngine) PresenceStats ¶
func (e *MemoryEngine) PresenceStats(ch string) (PresenceStats, error)
PresenceStats - see engine interface description.
func (*MemoryEngine) Publish ¶
func (e *MemoryEngine) Publish(ch string, pub *protocol.Publication, _ *ChannelOptions) error
Publish adds message into history hub and calls node ClientMsg method to handle message. We don't have any PUB/SUB here as Memory Engine is single node only.
func (*MemoryEngine) PublishControl ¶
func (e *MemoryEngine) PublishControl(data []byte) error
PublishControl - see Engine interface description.
func (*MemoryEngine) PublishJoin ¶
func (e *MemoryEngine) PublishJoin(ch string, join *protocol.Join, _ *ChannelOptions) error
PublishJoin - see engine interface description.
func (*MemoryEngine) PublishLeave ¶
func (e *MemoryEngine) PublishLeave(ch string, leave *protocol.Leave, _ *ChannelOptions) error
PublishLeave - see engine interface description.
func (*MemoryEngine) RemoveHistory ¶
func (e *MemoryEngine) RemoveHistory(ch string) error
RemoveHistory - see engine interface description.
func (*MemoryEngine) RemovePresence ¶
func (e *MemoryEngine) RemovePresence(ch string, uid string) error
RemovePresence - see engine interface description.
func (*MemoryEngine) Run ¶
func (e *MemoryEngine) Run(h BrokerEventHandler) error
Run runs memory engine - we do not have any logic here as Memory Engine ready to work just after initialization.
func (*MemoryEngine) Subscribe ¶
func (e *MemoryEngine) Subscribe(_ string) error
Subscribe is noop here.
func (*MemoryEngine) Unsubscribe ¶
func (e *MemoryEngine) Unsubscribe(_ string) error
Unsubscribe node from channel.
type MemoryEngineConfig ¶
type MemoryEngineConfig struct { // HistoryMetaTTL sets a time of inactive stream meta information expiration. // Must have a reasonable value for application. // At moment works with seconds precision. // TODO v1: maybe make this channel namespace option? // TODO v1: since we have epoch things should also properly work without meta // information at all (but we loose possibility of long-term recover in stream // without new messages). HistoryMetaTTL time.Duration }
MemoryEngineConfig is a memory engine config.
type MessageEvent ¶
type MessageEvent struct { // Data contains message untouched payload. Data []byte }
MessageEvent contains fields related to message request.
type MessageHandler ¶
type MessageHandler func(MessageEvent) MessageReply
MessageHandler must handle incoming async message from client.
type MessageReply ¶
type MessageReply struct {
Disconnect *Disconnect
}
MessageReply contains fields determining the reaction on message request.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is a heart of centrifuge library – it internally keeps and manages client connections, maintains information about other centrifuge nodes, keeps useful references to things like engine, hub etc.
func (*Node) ChannelOpts ¶
func (n *Node) ChannelOpts(ch string) (ChannelOptions, bool)
ChannelOpts returns channel options for channel using current channel config.
func (*Node) Channels ¶
Channels returns list of all channels currently active across on all nodes. This is a snapshot of state mostly useful for understanding what's going on with system.
func (*Node) Disconnect ¶
func (n *Node) Disconnect(user string, opts ...DisconnectOption) error
Disconnect allows to close all user connections through all nodes.
func (*Node) History ¶
func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error)
History allows to extract Publications in channel. The channel must belong to namespace where history is on.
func (*Node) LogEnabled ¶
LogEnabled allows to log entry.
func (*Node) NotifyShutdown ¶
func (n *Node) NotifyShutdown() chan struct{}
NotifyShutdown returns a channel which will be closed on node shutdown.
func (*Node) PersonalChannel ¶ added in v0.5.0
PersonalChannel returns personal channel for user based on node configuration.
func (*Node) Presence ¶
func (n *Node) Presence(ch string) (map[string]*ClientInfo, error)
Presence returns a map with information about active clients in channel.
func (*Node) PresenceStats ¶
func (n *Node) PresenceStats(ch string) (PresenceStats, error)
PresenceStats returns presence stats from engine.
func (*Node) Publish ¶
func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (PublishResult, error)
Publish sends data to all clients subscribed on channel. All running nodes will receive it and send to all local channel subscribers.
Data expected to be valid marshaled JSON or any binary payload. Connections that work over JSON protocol can not handle custom binary payloads. Connections that work over Protobuf protocol can work both with JSON and binary payloads.
So the rule here: if you have channel subscribers that work using JSON protocol then you can not publish binary data to these channel.
The returned PublishResult contains embedded StreamPosition that describes position inside stream Publication was added too. For channels without history enabled (i.e. when Publications only sent to PUB/SUB system) StreamPosition will be an empty struct (i.e. PublishResult.Offset will be zero).
func (*Node) RemoveHistory ¶
RemoveHistory removes channel history.
func (*Node) Run ¶
Run performs node startup actions. At moment must be called once on start after engine set to Node.
func (*Node) SetHistoryManager ¶
func (n *Node) SetHistoryManager(m HistoryManager)
SetHistoryManager allows to set HistoryManager to use.
func (*Node) SetPresenceManager ¶
func (n *Node) SetPresenceManager(m PresenceManager)
SetPresenceManager allows to set PresenceManager to use.
func (*Node) Shutdown ¶
Shutdown sets shutdown flag to Node so handlers could stop accepting new requests and disconnects clients with shutdown reason.
func (*Node) Unsubscribe ¶
func (n *Node) Unsubscribe(user string, ch string, opts ...UnsubscribeOption) error
Unsubscribe unsubscribes user from channel, if channel is equal to empty string then user will be unsubscribed from all channels.
type NodeEventHub ¶
type NodeEventHub interface { // Auth happens when client sends Connect command to server. In this handler client // can reject connection or provide Credentials for it. ClientConnecting(handler ConnectingHandler) // Connect called after client connection has been successfully established, // authenticated and connect reply already sent to client. This is a place // where application should set all required connection event callbacks and // can start communicating with client. ClientConnected(handler ConnectedHandler) // ClientRefresh called when it's time to refresh expiring client connection. ClientRefresh(handler RefreshHandler) }
NodeEventHub can deal with events binded to Node. All its methods are not goroutine-safe as handlers must be registered once before Node Run method called.
type NodeInfo ¶
type NodeInfo struct { UID string Name string Version string NumClients uint32 NumUsers uint32 NumChannels uint32 Uptime uint32 Metrics *Metrics }
NodeInfo contains information about node.
type PresenceManager ¶
type PresenceManager interface { // Presence returns actual presence information for channel. Presence(ch string) (map[string]*protocol.ClientInfo, error) // PresenceStats returns short stats of current presence data // suitable for scenarios when caller does not need full client // info returned by presence method. PresenceStats(ch string) (PresenceStats, error) // AddPresence sets or updates presence information in channel // for connection with specified identifier. Engine should have a // property to expire client information that was not updated // (touched) after some configured time interval. AddPresence(ch string, clientID string, info *protocol.ClientInfo, expire time.Duration) error // RemovePresence removes presence information for connection // with specified identifier. RemovePresence(ch string, clientID string) error }
PresenceManager is responsible for channel presence management.
type PresenceStats ¶
type PresenceStats struct { // NumClients is a number of client connections in channel. NumClients int // NumUsers is a number of unique users in channel. NumUsers int }
PresenceStats represents a short presence information for channel.
type ProtocolType ¶ added in v0.1.0
type ProtocolType string
ProtocolType represents client connection transport encoding format.
const ( // ProtocolTypeJSON means JSON protocol - i.e. data encoded in // JSON-streaming format. ProtocolTypeJSON ProtocolType = "json" // ProtocolTypeProtobuf means protobuf protocol - i.e. data encoded // as length-delimited protobuf messages. ProtocolTypeProtobuf ProtocolType = "protobuf" )
type Publication ¶
type Publication = protocol.Publication
Publication contains Data sent to channel subscribers. In channels with recover option on it also has incremental Offset. If Publication sent from client side it can also have Info (otherwise nil).
type PublishEvent ¶
type PublishEvent struct { Channel string Data []byte Info *ClientInfo }
PublishEvent contains fields related to publish event. Note that this event called before actual publish to Engine so handler has an option to reject this publication returning an error in PublishReply.
type PublishHandler ¶
type PublishHandler func(PublishEvent) PublishReply
PublishHandler called when client publishes into channel.
type PublishOption ¶
type PublishOption func(*PublishOptions)
PublishOption is a type to represent various Publish options.
type PublishOptions ¶
type PublishOptions struct { // SkipHistory allows to prevent saving specific Publication to channel history. SkipHistory bool }
PublishOptions define some fields to alter behaviour of Publish operation.
type PublishReply ¶
type PublishReply struct { // Error to return, nil value means no error. Error *Error // Disconnect client, nil value means no disconnect. Disconnect *Disconnect // Data is modified data to publish, zero value means no modification // of original data published by client. Data []byte }
PublishReply contains fields determining the reaction on publish event.
type RPCEvent ¶
type RPCEvent struct { // Method is an optional string that contains RPC method name client wants to call. // This is an optional field, by default clients send RPC without any method set. Method string // Data contains RPC untouched payload. Data []byte }
RPCEvent contains fields related to rpc request.
type RPCHandler ¶
RPCHandler must handle incoming command from client.
type RPCReply ¶
type RPCReply struct { // Error to return, nil value means no error. Error *Error // Disconnect client, nil value means no disconnect. Disconnect *Disconnect // Data to return in RPC reply to client. Data []byte }
RPCReply contains fields determining the reaction on rpc request.
type RedisEngine ¶
type RedisEngine struct {
// contains filtered or unexported fields
}
RedisEngine uses Redis to implement Engine functionality. This engine allows to scale Centrifuge based server to many instances and load balance client connections between them. Redis engine supports additionally supports Sentinel, client-side sharding and can work with Redis Cluster (or client-side shard between different Redis Clusters).
func NewRedisEngine ¶
func NewRedisEngine(n *Node, config RedisEngineConfig) (*RedisEngine, error)
NewRedisEngine initializes Redis Engine.
func (*RedisEngine) AddHistory ¶
func (e *RedisEngine) AddHistory(ch string, pub *protocol.Publication, opts *ChannelOptions) (StreamPosition, bool, error)
AddHistory - see engine interface description.
func (*RedisEngine) AddPresence ¶
func (e *RedisEngine) AddPresence(ch string, uid string, info *protocol.ClientInfo, exp time.Duration) error
AddPresence - see engine interface description.
func (*RedisEngine) Channels ¶
func (e *RedisEngine) Channels() ([]string, error)
Channels - see engine interface description.
func (*RedisEngine) History ¶
func (e *RedisEngine) History(ch string, filter HistoryFilter) ([]*protocol.Publication, StreamPosition, error)
History - see engine interface description.
func (*RedisEngine) Presence ¶
func (e *RedisEngine) Presence(ch string) (map[string]*protocol.ClientInfo, error)
Presence - see engine interface description.
func (*RedisEngine) PresenceStats ¶
func (e *RedisEngine) PresenceStats(ch string) (PresenceStats, error)
PresenceStats - see engine interface description.
func (*RedisEngine) Publish ¶
func (e *RedisEngine) Publish(ch string, pub *protocol.Publication, opts *ChannelOptions) error
Publish - see engine interface description.
func (*RedisEngine) PublishControl ¶
func (e *RedisEngine) PublishControl(data []byte) error
PublishControl - see engine interface description.
func (*RedisEngine) PublishJoin ¶
func (e *RedisEngine) PublishJoin(ch string, join *protocol.Join, opts *ChannelOptions) error
PublishJoin - see engine interface description.
func (*RedisEngine) PublishLeave ¶
func (e *RedisEngine) PublishLeave(ch string, leave *protocol.Leave, opts *ChannelOptions) error
PublishLeave - see engine interface description.
func (*RedisEngine) RemoveHistory ¶
func (e *RedisEngine) RemoveHistory(ch string) error
RemoveHistory - see engine interface description.
func (*RedisEngine) RemovePresence ¶
func (e *RedisEngine) RemovePresence(ch string, uid string) error
RemovePresence - see engine interface description.
func (*RedisEngine) Run ¶
func (e *RedisEngine) Run(h BrokerEventHandler) error
Run runs engine after node initialized.
func (*RedisEngine) Subscribe ¶
func (e *RedisEngine) Subscribe(ch string) error
Subscribe - see engine interface description.
func (*RedisEngine) Unsubscribe ¶
func (e *RedisEngine) Unsubscribe(ch string) error
Unsubscribe - see engine interface description.
type RedisEngineConfig ¶
type RedisEngineConfig struct { // PublishOnHistoryAdd is an option to control Redis Engine behaviour in terms of // adding to history and publishing message to channel. Redis Engine have a role // of Broker, HistoryManager and PresenceManager, this option is a tip to engine // implementation about the fact that Redis Engine used as both Broker and // HistoryManager. In this case we have a possibility to save Publications into // channel history stream and publish into PUB/SUB Redis channel via single RTT. PublishOnHistoryAdd bool // HistoryMetaTTL sets a time of stream meta key expiration in Redis. Stream // meta key is a Redis HASH that contains top offset in channel and epoch value. // By default stream meta keys do not expire. // // Though in some cases – when channels created for а short time and then // not used anymore – created stream meta keys can stay in memory while // not actually useful. For example you can have a personal user channel but // after using your app for a while user left it forever. In long-term // perspective this can be an unwanted memory leak. Setting a reasonable // value to this option (usually much bigger than history retention period) // can help. In this case unused channel stream meta data will eventually expire. // // TODO v1: maybe make this channel namespace option? // TODO v1: since we have epoch things should also properly work without meta // information at all (but we loose possibility of long-term recover in stream // without new messages). HistoryMetaTTL time.Duration // UseStreams allows to enable usage of Redis streams instead of list data // structure to keep history. Redis streams are more effective in terms of // missed publication recovery and history pagination since we don't need // to load entire structure to process memory (as we do in case of Redis Lists). // TODO v1: use by default? UseStreams bool // Shards is a list of Redis instance configs. Shards []RedisShardConfig }
RedisEngineConfig is a config for Redis Engine.
type RedisShardConfig ¶
type RedisShardConfig struct { // Host is Redis server host. Host string // Port is Redis server port. Port int // Password is password to use when connecting to Redis database. If empty then password not used. Password string // DB is Redis database number. If not set then database 0 used. DB int // Whether to use TLS connection or not. UseTLS bool // Whether to skip hostname verification as part of TLS handshake. TLSSkipVerify bool // Connection TLS configuration. TLSConfig *tls.Config // MasterName is a name of Redis instance master Sentinel monitors. MasterName string // SentinelAddrs is a slice of Sentinel addresses. SentinelAddrs []string // ClusterAddrs is a slice of seed cluster addrs for this shard. ClusterAddrs []string // Prefix to use before every channel name and key in Redis. Prefix string // IdleTimeout is timeout after which idle connections to Redis will be closed. IdleTimeout time.Duration // PubSubNumWorkers sets how many PUB/SUB message processing workers will be started. // By default we start runtime.NumCPU() workers. PubSubNumWorkers int // ReadTimeout is a timeout on read operations. Note that at moment it should be greater // than node ping publish interval in order to prevent timing out Pubsub connection's // Receive call. ReadTimeout time.Duration // WriteTimeout is a timeout on write operations. WriteTimeout time.Duration // ConnectTimeout is a timeout on connect operation. ConnectTimeout time.Duration }
RedisShardConfig is struct with Redis Engine options.
type RefreshEvent ¶
type RefreshEvent struct{}
RefreshEvent contains fields related to refresh event.
type RefreshHandler ¶
type RefreshHandler func(context.Context, *Client, RefreshEvent) RefreshReply
RefreshHandler called when it's time to validate client connection and update it's expiration time.
type RefreshReply ¶
type RefreshReply struct { // Expired when set mean that connection must be closed with DisconnectExpired reason. Expired bool // ExpireAt defines time in future when connection should expire, // zero value means no expiration. ExpireAt int64 // Info allows to modify connection information, zero value means no modification. Info []byte }
RefreshReply contains fields determining the reaction on refresh event.
type SockjsConfig ¶
type SockjsConfig struct { // HandlerPrefix sets prefix for SockJS handler endpoint path. HandlerPrefix string // URL is URL address to SockJS client javascript library. URL string // HeartbeatDelay sets how often to send heartbeat frames to clients. HeartbeatDelay time.Duration // CheckOrigin allows to decide whether to use CORS or not in XHR case. // When false returned then CORS headers won't be set. CheckOrigin func(*http.Request) bool // WebsocketCheckOrigin allows to set custom CheckOrigin func for underlying // gorilla Websocket based Upgrader. WebsocketCheckOrigin func(*http.Request) bool // WebsocketReadBufferSize is a parameter that is used for raw websocket Upgrader. // If set to zero reasonable default value will be used. WebsocketReadBufferSize int // WebsocketWriteBufferSize is a parameter that is used for raw websocket Upgrader. // If set to zero reasonable default value will be used. WebsocketWriteBufferSize int // WebsocketUseWriteBufferPool enables using buffer pool for writes in Websocket transport. WebsocketUseWriteBufferPool bool // WriteTimeout is maximum time of write message operation. // Slow client will be disconnected. // By default DefaultWebsocketWriteTimeout will be used. WebsocketWriteTimeout time.Duration }
SockjsConfig represents config for SockJS handler.
type SockjsHandler ¶
type SockjsHandler struct {
// contains filtered or unexported fields
}
SockjsHandler accepts SockJS connections.
func NewSockjsHandler ¶
func NewSockjsHandler(n *Node, c SockjsConfig) *SockjsHandler
NewSockjsHandler creates new SockjsHandler.
func (*SockjsHandler) ServeHTTP ¶
func (s *SockjsHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)
type StreamPosition ¶ added in v0.8.0
type StreamPosition struct { // Offset defines publication incremental offset inside a stream. Offset uint64 // Epoch of sequence and generation. Allows to handle situations when storage // lost stream entirely for some reason (expired or lost after restart) and we // want to track this fact to prevent successful recovery from another stream. // I.e. for example we have stream [1, 2, 3], then it's lost and new stream // contains [1, 2, 3, 4], client that recovers from position 3 will only receive // publication 4 missing 1, 2, 3 from new stream. With epoch we can tell client // that correct recovery is not possible. Epoch string }
StreamPosition contains fields to describe position in stream. At moment this is used for automatic recovery mechanics. More info about stream recovery in docs: https://centrifugal.github.io/centrifugo/server/recover/.
type SubRefreshEvent ¶
type SubRefreshEvent struct { // Channel to which SubRefreshEvent belongs. Channel string }
SubRefreshEvent contains fields related to subscription refresh event.
type SubRefreshHandler ¶
type SubRefreshHandler func(SubRefreshEvent) SubRefreshReply
SubRefreshHandler called when it's time to validate client subscription to channel and update it's state if needed.
type SubRefreshReply ¶
SubRefreshReply contains fields determining the reaction on subscription refresh event.
type SubscribeEvent ¶
type SubscribeEvent struct {
Channel string
}
SubscribeEvent contains fields related to subscribe event.
type SubscribeHandler ¶
type SubscribeHandler func(SubscribeEvent) SubscribeReply
SubscribeHandler called when client wants to subscribe on channel.
type SubscribeReply ¶
type SubscribeReply struct { // Error to return, nil value means no error. Error *Error // Disconnect client, nil value means no disconnect. Disconnect *Disconnect // ExpireAt defines time in future when subscription should expire, // zero value means no expiration. ExpireAt int64 // ChannelInfo defines custom channel information, zero value means no channel information. ChannelInfo []byte }
SubscribeReply contains fields determining the reaction on subscribe event.
type Transport ¶
type Transport interface { TransportInfo // Send sends data encoded using Centrifuge protocol to connection. Write([]byte) error // Close closes transport. Close(*Disconnect) error }
Transport abstracts a connection transport between server and client. It does not contain Read method as reading can be handled by connection handler code.
type TransportInfo ¶
type TransportInfo interface { // Name returns a name of transport used for client connection. Name() string // Protocol returns underlying transport protocol type used. // At moment this can be for example a JSON streaming based protocol // or Protobuf length-delimited protocol. Protocol() ProtocolType // Encoding returns payload encoding type used by client. By default // server assumes that payload passed as JSON. Encoding() EncodingType }
TransportInfo has read-only transport description methods.
type UnsubscribeEvent ¶
type UnsubscribeEvent struct { // Channel client unsubscribed from. Channel string }
UnsubscribeEvent contains fields related to unsubscribe event.
type UnsubscribeHandler ¶
type UnsubscribeHandler func(UnsubscribeEvent) UnsubscribeReply
UnsubscribeHandler called when client unsubscribed from channel.
type UnsubscribeOption ¶ added in v0.5.0
type UnsubscribeOption func(*UnsubscribeOptions)
UnsubscribeOption is a type to represent various Unsubscribe options.
func WithResubscribe ¶ added in v0.5.0
func WithResubscribe() UnsubscribeOption
WithResubscribe allows to set Resubscribe flag to true.
type UnsubscribeOptions ¶ added in v0.5.0
type UnsubscribeOptions struct { // Resubscribe allows to set resubscribe protocol flag. Resubscribe bool }
UnsubscribeOptions define some fields to alter behaviour of Unsubscribe operation.
type UnsubscribeReply ¶
type UnsubscribeReply struct { }
UnsubscribeReply contains fields determining the reaction on unsubscribe event.
type WebsocketConfig ¶
type WebsocketConfig struct { // CompressionLevel sets a level for websocket compression. // See posiible value description at https://golang.org/pkg/compress/flate/#NewWriter CompressionLevel int // CompressionMinSize allows to set minimal limit in bytes for // message to use compression when writing it into client connection. // By default it's 0 - i.e. all messages will be compressed when // WebsocketCompression enabled and compression negotiated with client. CompressionMinSize int // ReadBufferSize is a parameter that is used for raw websocket Upgrader. // If set to zero reasonable default value will be used. ReadBufferSize int // WriteBufferSize is a parameter that is used for raw websocket Upgrader. // If set to zero reasonable default value will be used. WriteBufferSize int // MessageSizeLimit sets the maximum size in bytes of allowed message from client. // By default DefaultWebsocketMaxMessageSize will be used. MessageSizeLimit int // CheckOrigin func to provide custom origin check logic. // nil means allow all origins. CheckOrigin func(r *http.Request) bool // PingInterval sets interval server will send ping messages to clients. // By default DefaultPingInterval will be used. PingInterval time.Duration // WriteTimeout is maximum time of write message operation. // Slow client will be disconnected. // By default DefaultWebsocketWriteTimeout will be used. WriteTimeout time.Duration // Compression allows to enable websocket permessage-deflate // compression support for raw websocket connections. It does // not guarantee that compression will be used - i.e. it only // says that server will try to negotiate it with client. Compression bool // UseWriteBufferPool enables using buffer pool for writes. UseWriteBufferPool bool }
WebsocketConfig represents config for WebsocketHandler.
type WebsocketHandler ¶
type WebsocketHandler struct {
// contains filtered or unexported fields
}
WebsocketHandler handles websocket client connections.
func NewWebsocketHandler ¶
func NewWebsocketHandler(n *Node, c WebsocketConfig) *WebsocketHandler
NewWebsocketHandler creates new WebsocketHandler.
func (*WebsocketHandler) ServeHTTP ¶
func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)
Source Files ¶
- channel.go
- client.go
- compatibility.go
- config.go
- credentials.go
- disconnect.go
- doc.go
- engine.go
- engine_memory.go
- engine_redis.go
- errors.go
- events.go
- handler_sockjs.go
- handler_websocket.go
- hub.go
- logging.go
- metrics.go
- node.go
- options.go
- pubqueue.go
- token_verifier.go
- token_verifier_jwt.go
- transport.go
- types.go
- util.go
- writer.go