Versions in this module Expand all Collapse all v1 v1.0.1 Jul 17, 2023 Changes in this version + const NoLimit + const PushFlagConnect + const PushFlagDisconnect + const PushFlagJoin + const PushFlagLeave + const PushFlagMessage + const PushFlagPublication + const PushFlagSubscribe + const PushFlagUnsubscribe + const UnsubscribeCodeClient + const UnsubscribeCodeDisconnect + const UnsubscribeCodeExpired + const UnsubscribeCodeInsufficient + const UnsubscribeCodeServer + var DisconnectBadRequest = Disconnect + var DisconnectChannelLimit = Disconnect + var DisconnectConnectionClosed = Disconnect + var DisconnectConnectionLimit = Disconnect + var DisconnectExpired = Disconnect + var DisconnectForceNoReconnect = Disconnect + var DisconnectForceReconnect = Disconnect + var DisconnectInappropriateProtocol = Disconnect + var DisconnectInsufficientState = Disconnect + var DisconnectInvalidToken = Disconnect + var DisconnectNoPong = Disconnect + var DisconnectNotAvailable = Disconnect + var DisconnectPermissionDenied = Disconnect + var DisconnectServerError = Disconnect + var DisconnectShutdown = Disconnect + var DisconnectSlow = Disconnect + var DisconnectStale = Disconnect + var DisconnectSubExpired = Disconnect + var DisconnectTooManyErrors = Disconnect + var DisconnectTooManyRequests = Disconnect + var DisconnectWriteError = Disconnect + var ErrorAlreadySubscribed = &Error + var ErrorBadRequest = &Error + var ErrorExpired = &Error + var ErrorInternal = &Error + var ErrorLimitExceeded = &Error + var ErrorMethodNotFound = &Error + var ErrorNotAvailable = &Error + var ErrorPermissionDenied = &Error + var ErrorTokenExpired = &Error + var ErrorTooManyRequests = &Error + var ErrorUnauthorized = &Error + var ErrorUnknownChannel = &Error + var ErrorUnrecoverablePosition = &Error + func HandleReadFrame(c *Client, r io.Reader) bool + func LogLevelToString(l LogLevel) string + func NewClient(ctx context.Context, n *Node, t Transport) (*Client, ClientCloseFunc, error) + func SetCredentials(ctx context.Context, cred *Credentials) context.Context + type AliveHandler func() + type Broker interface + History func(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error) + Publish func(ch string, data []byte, opts PublishOptions) (StreamPosition, error) + PublishControl func(data []byte, nodeID, shardKey string) error + PublishJoin func(ch string, info *ClientInfo) error + PublishLeave func(ch string, info *ClientInfo) error + RemoveHistory func(ch string) error + Run func(BrokerEventHandler) error + Subscribe func(ch string) error + Unsubscribe func(ch string) error + type BrokerEventHandler interface + HandleControl func(data []byte) error + HandleJoin func(ch string, info *ClientInfo) error + HandleLeave func(ch string, info *ClientInfo) error + HandlePublication func(ch string, pub *Publication, sp StreamPosition) error + type ChannelContext struct + Source uint8 + type Client struct + func (c *Client) AcquireStorage() (map[string]any, func(map[string]any)) + func (c *Client) Channels() []string + func (c *Client) ChannelsWithContext() map[string]ChannelContext + func (c *Client) Connect(req ConnectRequest) + func (c *Client) Context() context.Context + func (c *Client) Disconnect(disconnect ...Disconnect) + func (c *Client) HandleCommand(cmd *protocol.Command, cmdProtocolSize int) bool + func (c *Client) ID() string + func (c *Client) Info() []byte + func (c *Client) IsSubscribed(ch string) bool + func (c *Client) OnAlive(h AliveHandler) + func (c *Client) OnDisconnect(h DisconnectHandler) + func (c *Client) OnHistory(h HistoryHandler) + func (c *Client) OnMessage(h MessageHandler) + func (c *Client) OnPresence(h PresenceHandler) + func (c *Client) OnPresenceStats(h PresenceStatsHandler) + func (c *Client) OnPublish(h PublishHandler) + func (c *Client) OnRPC(h RPCHandler) + func (c *Client) OnRefresh(h RefreshHandler) + func (c *Client) OnStateSnapshot(h StateSnapshotHandler) + func (c *Client) OnSubRefresh(h SubRefreshHandler) + func (c *Client) OnSubscribe(h SubscribeHandler) + func (c *Client) OnUnsubscribe(h UnsubscribeHandler) + func (c *Client) Refresh(opts ...RefreshOption) error + func (c *Client) Send(data []byte) error + func (c *Client) StateSnapshot() (any, error) + func (c *Client) Subscribe(channel string, opts ...SubscribeOption) error + func (c *Client) Transport() TransportInfo + func (c *Client) Unsubscribe(ch string, unsubscribe ...Unsubscribe) + func (c *Client) UserID() string + type ClientCloseFunc func() error + type ClientInfo struct + ChanInfo []byte + ClientID string + ConnInfo []byte + UserID string + type Closer interface + Close func(ctx context.Context) error + type CommandProcessedEvent struct + Command *protocol.Command + Disconnect *Disconnect + Reply *protocol.Reply + Started time.Time + type CommandProcessedHandler func(*Client, CommandProcessedEvent) + type CommandReadEvent struct + Command *protocol.Command + CommandSize int + type CommandReadHandler func(*Client, CommandReadEvent) error + type Config struct + ChannelMaxLength int + ClientChannelLimit int + ClientChannelPositionCheckDelay time.Duration + ClientExpiredCloseDelay time.Duration + ClientExpiredSubCloseDelay time.Duration + ClientPresenceUpdateInterval time.Duration + ClientQueueMaxSize int + ClientStaleCloseDelay time.Duration + HistoryMaxPublicationLimit int + HistoryMetaTTL time.Duration + LogHandler LogHandler + LogLevel LogLevel + MetricsNamespace string + Name string + NodeInfoMetricsAggregateInterval time.Duration + RecoveryMaxPublicationLimit int + UseSingleFlight bool + UserConnectionLimit int + Version string + type ConnectEvent struct + Channels []string + ClientID string + Data []byte + Name string + Token string + Transport TransportInfo + Version string + type ConnectHandler func(*Client) + type ConnectReply struct + ClientSideRefresh bool + Context context.Context + Credentials *Credentials + Data []byte + MaxMessagesInFrame int + PingPongConfig *PingPongConfig + QueueInitialCap int + ReplyWithoutQueue bool + Storage map[string]any + Subscriptions map[string]SubscribeOptions + WriteDelay time.Duration + type ConnectRequest struct + Data []byte + Name string + Subs map[string]SubscribeRequest + Token string + Version string + type ConnectingHandler func(context.Context, ConnectEvent) (ConnectReply, error) + type Credentials struct + ExpireAt int64 + Info []byte + UserID string + func GetCredentials(ctx context.Context) (*Credentials, bool) + type Disconnect struct + Code uint32 + Reason string + func (d Disconnect) Error() string + func (d Disconnect) String() string + type DisconnectEvent struct + type DisconnectHandler func(DisconnectEvent) + type DisconnectOption func(options *DisconnectOptions) + func WithCustomDisconnect(disconnect Disconnect) DisconnectOption + func WithDisconnectClient(clientID string) DisconnectOption + func WithDisconnectClientWhitelist(whitelist []string) DisconnectOption + func WithDisconnectSession(sessionID string) DisconnectOption + type DisconnectOptions struct + ClientWhitelist []string + Disconnect *Disconnect + type EmulationConfig struct + MaxRequestBodySize int + type EmulationHandler struct + func NewEmulationHandler(node *Node, config EmulationConfig) *EmulationHandler + func (s *EmulationHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) + type Error struct + Code uint32 + Message string + Temporary bool + func (e *Error) Error() string + type HTTPStreamConfig struct + MaxRequestBodySize int + type HTTPStreamHandler struct + func NewHTTPStreamHandler(node *Node, config HTTPStreamConfig) *HTTPStreamHandler + func (h *HTTPStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) + type HistoryCallback func(HistoryReply, error) + type HistoryEvent struct + Channel string + Filter HistoryFilter + type HistoryFilter struct + Limit int + Reverse bool + Since *StreamPosition + type HistoryHandler func(HistoryEvent, HistoryCallback) + type HistoryOption func(options *HistoryOptions) + func WithHistoryFilter(filter HistoryFilter) HistoryOption + func WithHistoryMetaTTL(metaTTL time.Duration) HistoryOption + func WithLimit(limit int) HistoryOption + func WithReverse(reverse bool) HistoryOption + func WithSince(sp *StreamPosition) HistoryOption + type HistoryOptions struct + Filter HistoryFilter + MetaTTL time.Duration + type HistoryReply struct + Result *HistoryResult + type HistoryResult struct + Publications []*Publication + type Hub struct + func (h *Hub) BroadcastPublication(ch string, pub *Publication, sp StreamPosition) error + func (h *Hub) Channels() []string + func (h *Hub) Connections() map[string]*Client + func (h *Hub) NumChannels() int + func (h *Hub) NumClients() int + func (h *Hub) NumSubscribers(ch string) int + func (h *Hub) NumSubscriptions() int + func (h *Hub) NumUsers() int + func (h *Hub) UserConnections(userID string) map[string]*Client + type Info struct + Nodes []NodeInfo + type LogEntry struct + Fields map[string]any + Level LogLevel + Message string + func NewLogEntry(level LogLevel, message string, fields ...map[string]any) LogEntry + type LogHandler func(LogEntry) + type LogLevel int + const LogLevelDebug + const LogLevelError + const LogLevelInfo + const LogLevelNone + const LogLevelTrace + const LogLevelWarn + type MemoryBroker struct + func NewMemoryBroker(n *Node, _ MemoryBrokerConfig) (*MemoryBroker, error) + func (b *MemoryBroker) Close(_ context.Context) error + func (b *MemoryBroker) History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error) + func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error) + func (b *MemoryBroker) PublishControl(data []byte, _, _ string) error + func (b *MemoryBroker) PublishJoin(ch string, info *ClientInfo) error + func (b *MemoryBroker) PublishLeave(ch string, info *ClientInfo) error + func (b *MemoryBroker) RemoveHistory(ch string) error + func (b *MemoryBroker) Run(h BrokerEventHandler) error + func (b *MemoryBroker) Subscribe(_ string) error + func (b *MemoryBroker) Unsubscribe(_ string) error + type MemoryBrokerConfig struct + type MemoryPresenceManager struct + func NewMemoryPresenceManager(n *Node, c MemoryPresenceManagerConfig) (*MemoryPresenceManager, error) + func (m *MemoryPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error + func (m *MemoryPresenceManager) Close(_ context.Context) error + func (m *MemoryPresenceManager) Presence(ch string) (map[string]*ClientInfo, error) + func (m *MemoryPresenceManager) PresenceStats(ch string) (PresenceStats, error) + func (m *MemoryPresenceManager) RemovePresence(ch string, uid string) error + type MemoryPresenceManagerConfig struct + type MessageEvent struct + Data []byte + type MessageHandler func(MessageEvent) + type Metrics struct + Interval float64 + Items map[string]float64 + type Node struct + func New(c Config) (*Node, error) + func (n *Node) Config() Config + func (n *Node) Disconnect(userID string, opts ...DisconnectOption) error + func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error) + func (n *Node) Hub() *Hub + func (n *Node) ID() string + func (n *Node) Info() (Info, error) + func (n *Node) Log(entry LogEntry) + func (n *Node) LogEnabled(level LogLevel) bool + func (n *Node) Notify(op string, data []byte, toNodeID string) error + func (n *Node) NotifyShutdown() chan struct{} + func (n *Node) OnCommandProcessed(handler CommandProcessedHandler) + func (n *Node) OnCommandRead(handler CommandReadHandler) + func (n *Node) OnConnect(handler ConnectHandler) + func (n *Node) OnConnecting(handler ConnectingHandler) + func (n *Node) OnNodeInfoSend(handler NodeInfoSendHandler) + func (n *Node) OnNotification(handler NotificationHandler) + func (n *Node) OnSurvey(handler SurveyHandler) + func (n *Node) OnTransportWrite(handler TransportWriteHandler) + func (n *Node) Presence(ch string) (PresenceResult, error) + func (n *Node) PresenceStats(ch string) (PresenceStatsResult, error) + func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (PublishResult, error) + func (n *Node) Refresh(userID string, opts ...RefreshOption) error + func (n *Node) RemoveHistory(ch string) error + func (n *Node) Run() error + func (n *Node) SetBroker(b Broker) + func (n *Node) SetPresenceManager(m PresenceManager) + func (n *Node) Shutdown(ctx context.Context) error + func (n *Node) Subscribe(userID string, channel string, opts ...SubscribeOption) error + func (n *Node) Survey(ctx context.Context, op string, data []byte, toNodeID string) (map[string]SurveyResult, error) + func (n *Node) Unsubscribe(userID string, channel string, opts ...UnsubscribeOption) error + type NodeInfo struct + Data []byte + Metrics *Metrics + Name string + NumChannels uint32 + NumClients uint32 + NumSubs uint32 + NumUsers uint32 + UID string + Uptime uint32 + Version string + type NodeInfoSendHandler func() NodeInfoSendReply + type NodeInfoSendReply struct + Data []byte + type NotificationEvent struct + Data []byte + FromNodeID string + Op string + type NotificationHandler func(NotificationEvent) + type PingPongConfig struct + PingInterval time.Duration + PongTimeout time.Duration + type PresenceCallback func(PresenceReply, error) + type PresenceEvent struct + Channel string + type PresenceHandler func(PresenceEvent, PresenceCallback) + type PresenceManager interface + AddPresence func(ch string, clientID string, info *ClientInfo) error + Presence func(ch string) (map[string]*ClientInfo, error) + PresenceStats func(ch string) (PresenceStats, error) + RemovePresence func(ch string, clientID string) error + type PresenceReply struct + Result *PresenceResult + type PresenceResult struct + Presence map[string]*ClientInfo + type PresenceStats struct + NumClients int + NumUsers int + type PresenceStatsCallback func(PresenceStatsReply, error) + type PresenceStatsEvent struct + Channel string + type PresenceStatsHandler func(PresenceStatsEvent, PresenceStatsCallback) + type PresenceStatsReply struct + Result *PresenceStatsResult + type PresenceStatsResult struct + type ProtocolType string + const ProtocolTypeJSON + const ProtocolTypeProtobuf + type ProtocolVersion uint8 + const ProtocolVersion2 + type Publication struct + Data []byte + Info *ClientInfo + Offset uint64 + Tags map[string]string + type PublishCallback func(PublishReply, error) + type PublishEvent struct + Channel string + ClientInfo *ClientInfo + Data []byte + type PublishHandler func(PublishEvent, PublishCallback) + type PublishOption func(*PublishOptions) + func WithClientInfo(info *ClientInfo) PublishOption + func WithHistory(size int, ttl time.Duration, metaTTL ...time.Duration) PublishOption + func WithTags(meta map[string]string) PublishOption + type PublishOptions struct + ClientInfo *ClientInfo + HistoryMetaTTL time.Duration + HistorySize int + HistoryTTL time.Duration + Tags map[string]string + type PublishReply struct + Options PublishOptions + Result *PublishResult + type PublishResult struct + type RPCCallback func(RPCReply, error) + type RPCEvent struct + Data []byte + Method string + type RPCHandler func(RPCEvent, RPCCallback) + type RPCReply struct + Data []byte + type RedisBroker struct + func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error) + func (b *RedisBroker) Close(_ context.Context) error + func (b *RedisBroker) History(ch string, opts HistoryOptions) ([]*Publication, StreamPosition, error) + func (b *RedisBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error) + func (b *RedisBroker) PublishControl(data []byte, nodeID string, _ string) error + func (b *RedisBroker) PublishJoin(ch string, info *ClientInfo) error + func (b *RedisBroker) PublishLeave(ch string, info *ClientInfo) error + func (b *RedisBroker) RemoveHistory(ch string) error + func (b *RedisBroker) Run(h BrokerEventHandler) error + func (b *RedisBroker) Subscribe(ch string) error + func (b *RedisBroker) Unsubscribe(ch string) error + type RedisBrokerConfig struct + Prefix string + Shards []*RedisShard + UseLists bool + type RedisPresenceManager struct + func NewRedisPresenceManager(n *Node, config RedisPresenceManagerConfig) (*RedisPresenceManager, error) + func (m *RedisPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error + func (m *RedisPresenceManager) Close(_ context.Context) error + func (m *RedisPresenceManager) Presence(ch string) (map[string]*ClientInfo, error) + func (m *RedisPresenceManager) PresenceStats(ch string) (PresenceStats, error) + func (m *RedisPresenceManager) RemovePresence(ch string, uid string) error + type RedisPresenceManagerConfig struct + Prefix string + PresenceTTL time.Duration + Shards []*RedisShard + type RedisShard struct + func NewRedisShard(_ *Node, conf RedisShardConfig) (*RedisShard, error) + func (s *RedisShard) Close() + type RedisShardConfig struct + Address string + ClientName string + ClusterAddresses []string + ConnectTimeout time.Duration + DB int + ForceRESP2 bool + IOTimeout time.Duration + Password string + SentinelAddresses []string + SentinelClientName string + SentinelMasterName string + SentinelPassword string + SentinelTLSConfig *tls.Config + SentinelUser string + TLSConfig *tls.Config + User string + type RefreshCallback func(RefreshReply, error) + type RefreshEvent struct + ClientSideRefresh bool + Token string + type RefreshHandler func(RefreshEvent, RefreshCallback) + type RefreshOption func(options *RefreshOptions) + func WithRefreshClient(clientID string) RefreshOption + func WithRefreshExpireAt(expireAt int64) RefreshOption + func WithRefreshExpired(expired bool) RefreshOption + func WithRefreshInfo(info []byte) RefreshOption + func WithRefreshSession(sessionID string) RefreshOption + type RefreshOptions struct + ExpireAt int64 + Expired bool + Info []byte + type RefreshReply struct + ExpireAt int64 + Expired bool + Info []byte + type SSEConfig struct + MaxRequestBodySize int + type SSEHandler struct + func NewSSEHandler(node *Node, config SSEConfig) *SSEHandler + func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) + type SockjsConfig struct + CheckOrigin func(*http.Request) bool + HandlerPrefix string + URL string + WebsocketCheckOrigin func(*http.Request) bool + WebsocketReadBufferSize int + WebsocketUseWriteBufferPool bool + WebsocketWriteBufferSize int + WebsocketWriteTimeout time.Duration + type SockjsHandler struct + func NewSockjsHandler(node *Node, config SockjsConfig) *SockjsHandler + func (s *SockjsHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) + type StateSnapshotHandler func() (any, error) + type StreamPosition struct + Epoch string + Offset uint64 + type SubRefreshCallback func(SubRefreshReply, error) + type SubRefreshEvent struct + Channel string + ClientSideRefresh bool + Token string + type SubRefreshHandler func(SubRefreshEvent, SubRefreshCallback) + type SubRefreshReply struct + ExpireAt int64 + Expired bool + Info []byte + type SubscribeCallback func(SubscribeReply, error) + type SubscribeEvent struct + Channel string + Data []byte + JoinLeave bool + Positioned bool + Recoverable bool + Token string + type SubscribeHandler func(SubscribeEvent, SubscribeCallback) + type SubscribeOption func(*SubscribeOptions) + func WithChannelInfo(chanInfo []byte) SubscribeOption + func WithEmitJoinLeave(enabled bool) SubscribeOption + func WithEmitPresence(enabled bool) SubscribeOption + func WithExpireAt(expireAt int64) SubscribeOption + func WithPositioning(enabled bool) SubscribeOption + func WithPushJoinLeave(enabled bool) SubscribeOption + func WithRecoverSince(since *StreamPosition) SubscribeOption + func WithRecovery(enabled bool) SubscribeOption + func WithSubscribeClient(clientID string) SubscribeOption + func WithSubscribeData(data []byte) SubscribeOption + func WithSubscribeHistoryMetaTTL(metaTTL time.Duration) SubscribeOption + func WithSubscribeSession(sessionID string) SubscribeOption + func WithSubscribeSource(source uint8) SubscribeOption + type SubscribeOptions struct + ChannelInfo []byte + Data []byte + EmitJoinLeave bool + EmitPresence bool + EnablePositioning bool + EnableRecovery bool + ExpireAt int64 + HistoryMetaTTL time.Duration + PushJoinLeave bool + RecoverSince *StreamPosition + Source uint8 + type SubscribeReply struct + ClientSideRefresh bool + Options SubscribeOptions + type SubscribeRequest struct + Epoch string + Offset uint64 + Recover bool + type SurveyCallback func(SurveyReply) + type SurveyEvent struct + Data []byte + Op string + type SurveyHandler func(SurveyEvent, SurveyCallback) + type SurveyReply struct + Code uint32 + Data []byte + type SurveyResult struct + Code uint32 + Data []byte + type Transport interface + Close func(Disconnect) error + Write func([]byte) error + WriteMany func(...[]byte) error + type TransportInfo interface + DisabledPushFlags func() uint64 + Emulation func() bool + Name func() string + PingPongConfig func() PingPongConfig + Protocol func() ProtocolType + ProtocolVersion func() ProtocolVersion + Unidirectional func() bool + type TransportWriteEvent struct + Data []byte + type TransportWriteHandler func(*Client, TransportWriteEvent) bool + type Unsubscribe struct + Code uint32 + Reason string + func (d Unsubscribe) String() string + type UnsubscribeEvent struct + Channel string + Disconnect *Disconnect + ServerSide bool + type UnsubscribeHandler func(UnsubscribeEvent) + type UnsubscribeOption func(options *UnsubscribeOptions) + func WithCustomUnsubscribe(unsubscribe Unsubscribe) UnsubscribeOption + func WithUnsubscribeClient(clientID string) UnsubscribeOption + func WithUnsubscribeSession(sessionID string) UnsubscribeOption + type UnsubscribeOptions struct + type WebsocketConfig struct + CheckOrigin func(r *http.Request) bool + Compression bool + CompressionLevel int + CompressionMinSize int + MessageSizeLimit int + ReadBufferSize int + UseWriteBufferPool bool + WriteBufferSize int + WriteTimeout time.Duration + type WebsocketHandler struct + func NewWebsocketHandler(node *Node, config WebsocketConfig) *WebsocketHandler + func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)