centrifuge

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2019 License: MIT Imports: 34 Imported by: 91

README

Build Status GoDoc

This library has no v1 release yet so API can be changed. Use with strict versioning.

Centrifuge library is a real-time core of Centrifugo server. It's also aimed to be a general purpose real-time messaging library for Go programming language.

Features:

  • Fast and optimized for low-latency communication with thousands of client connections
  • WebSocket with JSON or binary Protobuf protocol
  • SockJS polyfill library support for browsers where WebSocket not available (JSON only)
  • Built-in horizontal scalability with Redis PUB/SUB, Redis sharding, Sentinel for HA
  • Possibility to register custom PUB/SUB broker, history and presence storage implementations
  • Native authentication over middleware or JWT-based
  • Bidirectional asynchronous message communication and RPC calls
  • Channel (room) concept to broadcast message to all channel subscribers
  • Presence information for channels (show all active clients in channel)
  • History information for channels (last messages published into channel)
  • Join/leave events for channels (aka client goes online/offline)
  • Message recovery mechanism for channels to survive short network disconnects or node restart
  • Prometheus instrumentation
  • Client libraries for main application environments (see below)

Client libraries:

Godoc and examples

Installation

To install globally into $GOPATH use:

go get -u github.com/centrifugal/centrifuge

But recommended way is using tools like dep or go mod to add this library as dependency to your project.

Quick example

Let's take a look on how to build the simplest real-time chat ever with Centrifuge library. Clients will be able to open page in browser, connect to server over Websocket, send message into channel and this message will be instantly delivered to all active channel subscribers. On server side we will accept all connections and will work as simple PUB/SUB proxy without worrying too much about permissions. In this example we will use Centrifuge Javascript client on frontend.

Create file main.go with the following code:

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"syscall"
	"time"

	// Import this library.
	"github.com/centrifugal/centrifuge"
)

func handleLog(e centrifuge.LogEntry) {
	log.Printf("%s: %v", e.Message, e.Fields)
}

// Wait until program interrupted. When interrupted gracefully shutdown Node.
func waitExitSignal(n *centrifuge.Node) {
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
		defer cancel()
		n.Shutdown(ctx)
		done <- true
	}()
	<-done
}

func main() {
	// We use default config here as starting point. Default config contains
	// reasonable values for available options.
	cfg := centrifuge.DefaultConfig
	// In this example we want client to do all possible actions with server
	// without any authentication and authorization. Insecure flag DISABLES
	// many security related checks in library. This is only to make example
	// short. In real app you most probably want authenticate and authorize
	// access to server. See godoc and examples in repo for more details.
	cfg.ClientInsecure = true
	// By default clients can not publish messages into channels. Setting this
	// option to true we allow them to publish.
	cfg.Publish = true

	// Centrifuge library exposes logs with different log level. In your app
	// you can set special function to handle these log entries in a way you want.
	cfg.LogLevel = centrifuge.LogLevelDebug
	cfg.LogHandler = handleLog

	// Node is the core object in Centrifuge library responsible for many useful
	// things. Here we initialize new Node instance and pass config to it.
	node, _ := centrifuge.New(cfg)

	// ClientConnected node event handler is a point where you generally create a 
	// binding between Centrifuge and your app business logic. Callback function you 
	// pass here will be called every time new connection established with server. 
	// Inside this callback function you can set various event handlers for connection.
	node.On().ClientConnected(func(ctx context.Context, client *centrifuge.Client) {
		// Set Subscribe Handler to react on every channel subscribtion attempt
		// initiated by client. Here you can theoretically return an error or
		// disconnect client from server if needed. But now we just accept
		// all subscriptions.
		client.On().Subscribe(func(e centrifuge.SubscribeEvent) centrifuge.SubscribeReply {
			log.Printf("client subscribes on channel %s", e.Channel)
			return centrifuge.SubscribeReply{}
		})

		// Set Publish Handler to react on every channel Publication sent by client.
		// Inside this method you can validate client permissions to publish into
		// channel. But in our simple chat app we allow everyone to publish into
		// any channel.
		client.On().Publish(func(e centrifuge.PublishEvent) centrifuge.PublishReply {
			log.Printf("client publishes into channel %s: %s", e.Channel, string(e.Data))
			return centrifuge.PublishReply{}
		})

		// Set Disconnect Handler to react on client disconnect events.
		client.On().Disconnect(func(e centrifuge.DisconnectEvent) centrifuge.DisconnectReply {
			log.Printf("client disconnected")
			return centrifuge.DisconnectReply{}
		})

		// In our example transport will always be Websocket but it can also be SockJS.
		transportName := client.Transport().Name()
		// In our example clients connect with JSON protocol but it can also be Protobuf.
		transportEncoding := client.Transport().Encoding()

		log.Printf("client connected via %s (%s)", transportName, transportEncoding)
	})

	// Run node.
	if err := node.Run(); err != nil {
		panic(err)
	}

	// Configure http routes.

	// The first route is for handling Websocket connections.
	http.Handle("/connection/websocket", centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))

	// The second route is for serving index.html file.
	http.Handle("/", http.FileServer(http.Dir("./")))

	// Start HTTP server.
	go func() {
		if err := http.ListenAndServe(":8000", nil); err != nil {
			panic(err)
		}
	}()

	// Run program until interrupted.
	waitExitSignal(node)
}

Also create file index.html near main.go with content:

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
        <script type="text/javascript" src="https://rawgit.com/centrifugal/centrifuge-js/master/dist/centrifuge.min.js"></script>
    </head>
    <body>
        <input type="text" id="input" />
        <script type="text/javascript">
            // Create Centrifuge object with Websocket endpoint address set in main.go
            var centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket');
            function drawText(text) {
                var div = document.createElement('div');
                div.innerHTML = text + '<br>';
                document.body.appendChild(div);
            }
            centrifuge.on('connect', function(ctx){
                drawText('Connected over ' + ctx.transport);
            });
            centrifuge.on('disconnect', function(ctx){
                drawText('Disconnected: ' + ctx.reason);
            });
            var sub = centrifuge.subscribe("chat", function(message) {
                drawText(JSON.stringify(message));
            })
            var input = document.getElementById("input");
            input.addEventListener('keyup', function(e) {
                if (e.keyCode == 13) { // ENTER key pressed
                    sub.publish(this.value);
                    input.value = '';
                }
            });
            // After setting event handlers – initiate actual connection with server.
            centrifuge.connect();
        </script>
    </body>
</html>

Then run Go program as usual:

go run main.go

Open several browser tabs with http://localhost:8000 and see chat in action.

This example is only the top of an iceberg. But it should give you an insight on library API.

Keep in mind that Centrifuge library is not a framework to build chat apps. It's a general purpose real-time transport for your messages with some helpful primitives. You can build many kinds of real-time apps on top of this library including chats but depending on application you may need to write business logic yourself.

For contributors

Library uses both dep and go mod to manage dependencies.

Here is an example on how you can clone library and install all required dependencies locally:

mkdir -p $GOPATH/src/github.com/centrifugal
git clone https://github.com/centrifugal/centrifuge.git $GOPATH/src/github.com/centrifugal/centrifuge
cd $GOPATH/src/github.com/centrifugal/centrifuge
dep ensure

Another way is using go get but all dependencies will be downloaded into your global $GOPATH in this case:

go get -u github.com/centrifugal/centrifuge

Documentation

Overview

Package centrifuge is a real-time messaging library that abstracts several bidirectional transports (Websocket, SockJS) and provides primitives to build real-time applications with Go. It's also used as core of Centrifugo server.

The API of this library is almost all goroutine-safe except cases where one-time operations like setting callback handlers performed.

Centrifuge library provides several features on top of plain Websocket implementation - see full description in library README on Github – https://github.com/centrifugal/centrifuge.

Also see examples in repo to see main library concepts in action.

Index

Constants

This section is empty.

Variables

View Source
var (
	// DisconnectNormal is clean disconnect when client cleanly closed connection.
	DisconnectNormal = &Disconnect{
		Code:      3000,
		Reason:    "normal",
		Reconnect: true,
	}
	// DisconnectShutdown sent when node is going to shut down.
	DisconnectShutdown = &Disconnect{
		Code:      3001,
		Reason:    "shutdown",
		Reconnect: true,
	}
	// DisconnectInvalidToken sent when client came with invalid token.
	DisconnectInvalidToken = &Disconnect{
		Code:      3002,
		Reason:    "invalid token",
		Reconnect: false,
	}
	// DisconnectBadRequest sent when client uses malformed protocol
	// frames or wrong order of commands.
	DisconnectBadRequest = &Disconnect{
		Code:      3003,
		Reason:    "bad request",
		Reconnect: false,
	}
	// DisconnectServerError sent when internal error occurred on server.
	DisconnectServerError = &Disconnect{
		Code:      3004,
		Reason:    "internal server error",
		Reconnect: true,
	}
	// DisconnectExpired sent when client connection expired.
	DisconnectExpired = &Disconnect{
		Code:      3005,
		Reason:    "expired",
		Reconnect: true,
	}
	// DisconnectSubExpired sent when client subscription expired.
	DisconnectSubExpired = &Disconnect{
		Code:      3006,
		Reason:    "subscription expired",
		Reconnect: true,
	}
	// DisconnectStale sent to close connection that did not become
	// authenticated in configured interval after dialing.
	DisconnectStale = &Disconnect{
		Code:      3007,
		Reason:    "stale",
		Reconnect: false,
	}
	// DisconnectSlow sent when client can't read messages fast enough.
	DisconnectSlow = &Disconnect{
		Code:      3008,
		Reason:    "slow",
		Reconnect: true,
	}
	// DisconnectWriteError sent when an error occurred while writing to
	// client connection.
	DisconnectWriteError = &Disconnect{
		Code:      3009,
		Reason:    "write error",
		Reconnect: true,
	}
	// DisconnectInsufficientState sent when server detects wrong client
	// position in channel Publication stream. Disconnect allows client
	// to restore missed publications on reconnect.
	DisconnectInsufficientState = &Disconnect{
		Code:      3010,
		Reason:    "insufficient state",
		Reconnect: true,
	}
	// DisconnectForceReconnect sent when server forcely disconnects connection.
	DisconnectForceReconnect = &Disconnect{
		Code:      3011,
		Reason:    "force reconnect",
		Reconnect: true,
	}
	// DisconnectForceNoReconnect sent when server forcely disconnects connection
	// and asks it to not reconnect again.
	DisconnectForceNoReconnect = &Disconnect{
		Code:      3012,
		Reason:    "force disconnect",
		Reconnect: false,
	}
)

Some predefined disconnect structures used by library internally. Though it's always possible to create Disconnect with any field values on the fly.

View Source
var (
	// ErrorInternal means server error, if returned this is a signal
	// that something went wrong with server itself and client most probably
	// not guilty.
	ErrorInternal = &Error{
		Code:    100,
		Message: "internal server error",
	}
	// ErrUnauthorized says that request is unauthorized.
	ErrorUnauthorized = &Error{
		Code:    101,
		Message: "unauthorized",
	}
	// ErrorNamespaceNotFound means that namespace in channel name does not exist.
	ErrorNamespaceNotFound = &Error{
		Code:    102,
		Message: "namespace not found",
	}
	// ErrorPermissionDenied means that access to resource not allowed.
	ErrorPermissionDenied = &Error{
		Code:    103,
		Message: "permission denied",
	}
	// ErrorMethodNotFound means that method sent in command does not exist.
	ErrorMethodNotFound = &Error{
		Code:    104,
		Message: "method not found",
	}
	// ErrorAlreadySubscribed returned when client wants to subscribe on channel
	// it already subscribed to.
	ErrorAlreadySubscribed = &Error{
		Code:    105,
		Message: "already subscribed",
	}
	// ErrorLimitExceeded says that some sort of limit exceeded, server logs should
	// give more detailed information.
	ErrorLimitExceeded = &Error{
		Code:    106,
		Message: "limit exceeded",
	}
	// ErrorBadRequest says that server can not process received
	// data because it is malformed.
	ErrorBadRequest = &Error{
		Code:    107,
		Message: "bad request",
	}
	// ErrorNotAvailable means that resource is not enabled.
	ErrorNotAvailable = &Error{
		Code:    108,
		Message: "not available",
	}
	// ErrorTokenExpired ...
	ErrorTokenExpired = &Error{
		Code:    109,
		Message: "token expired",
	}
	// ErrorExpired ...
	ErrorExpired = &Error{
		Code:    110,
		Message: "expired",
	}
)

Here we define well-known errors that can be used in client protocol replies. Library user can define own application specific errors. When define new custom error it is recommended to use error codes > 1000 assuming that codes in interval 0-999 reserved by Centrifuge.

View Source
var (
	PushTypePublication = proto.PushTypePublication
	PushTypeJoin        = proto.PushTypeJoin
	PushTypeLeave       = proto.PushTypeLeave
)

Push types.

View Source
var DefaultConfig = Config{
	Name: "centrifuge",

	NodeInfoMetricsAggregateInterval: 60 * time.Second,

	ChannelMaxLength:         255,
	ChannelPrivatePrefix:     "$",
	ChannelNamespaceBoundary: ":",
	ChannelUserBoundary:      "#",
	ChannelUserSeparator:     ",",

	ClientInsecure:                  false,
	ClientAnonymous:                 false,
	ClientPresencePingInterval:      25 * time.Second,
	ClientPresenceExpireInterval:    60 * time.Second,
	ClientMessageWriteTimeout:       0,
	ClientPingInterval:              25 * time.Second,
	ClientExpiredCloseDelay:         25 * time.Second,
	ClientExpiredSubCloseDelay:      25 * time.Second,
	ClientStaleCloseDelay:           25 * time.Second,
	ClientChannelPositionCheckDelay: 40 * time.Second,
	ClientRequestMaxSize:            65536,
	ClientQueueMaxSize:              10485760,
	ClientChannelLimit:              128,
}

DefaultConfig is Config initialized with default values for all fields.

View Source
var (
	// ErrNoChannelOptions returned when operation can't be performed because no
	// appropriate channel options were found for channel.
	ErrNoChannelOptions = errors.New("no channel options found")
)
View Source
var (
	// ErrPublished returned to indicate that node should not publish message to broker.
	ErrPublished = errors.New("message published")
)
View Source
var LogStringToLevel = map[string]LogLevel{
	"debug": LogLevelDebug,
	"info":  LogLevelInfo,
	"error": LogLevelError,
	"none":  LogLevelNone,
}

LogStringToLevel matches level string to LogLevel.

Functions

func LogLevelToString

func LogLevelToString(l LogLevel) string

LogLevelToString transforms Level to its string representation.

func SetCredentials

func SetCredentials(ctx context.Context, creds *Credentials) context.Context

SetCredentials allows to set connection Credentials to context.

Types

type Broker

type Broker interface {
	// Run called once on start when broker already set to node. At
	// this moment node is ready to process broker events.
	Run(BrokerEventHandler) error

	// Subscribe node on channel to listen all messages coming from channel.
	Subscribe(ch string) error
	// Unsubscribe node from channel to stop listening messages from it.
	Unsubscribe(ch string) error

	// Publish allows to send Publication Push into channel. Publications should
	// be delivered to all clients subscribed on this channel at moment on
	// any Centrifuge node. The returned value is channel in which we will
	// send error as soon as engine finishes publish operation.
	Publish(ch string, pub *Publication, opts *ChannelOptions) error
	// PublishJoin publishes Join Push message into channel.
	PublishJoin(ch string, join *Join, opts *ChannelOptions) error
	// PublishLeave publishes Leave Push message into channel.
	PublishLeave(ch string, leave *Leave, opts *ChannelOptions) error
	// PublishControl allows to send control command data to all running nodes.
	PublishControl(data []byte) error

	// Channels returns slice of currently active channels (with one or more
	// subscribers) on all running nodes. This is possible with Redis but can
	// be much harder in other PUB/SUB system. Anyway this information can only
	// be used for admin needs to better understand state of system. So it's not
	// a big problem if another Broker implementation won't support this method.
	Channels() ([]string, error)
}

Broker is responsible for PUB/SUB mechanics.

type BrokerEventHandler

type BrokerEventHandler interface {
	// Publication must register callback func to handle Publications received.
	HandlePublication(ch string, pub *Publication) error
	// Join must register callback func to handle Join messages received.
	HandleJoin(ch string, join *Join) error
	// Leave must register callback func to handle Leave messages received.
	HandleLeave(ch string, leave *Leave) error
	// Control must register callback func to handle Control data received.
	HandleControl([]byte) error
}

BrokerEventHandler can handle messages received from PUB/SUB system.

type ChannelContext

type ChannelContext struct {
	Info proto.Raw
	// contains filtered or unexported fields
}

ChannelContext contains extra context for channel connection subscribed to.

type ChannelNamespace

type ChannelNamespace struct {
	// Name is a unique namespace name.
	Name string `json:"name"`

	// Options for namespace determine channel options for channels
	// belonging to this namespace.
	ChannelOptions `mapstructure:",squash"`
}

ChannelNamespace allows to create channels with different channel options.

type ChannelOptions

type ChannelOptions struct {
	// Publish enables possibility for clients to publish messages into channels.
	// Once enabled client can publish into channel and that publication will be
	// broadcasted to all current channel subscribers. You can control publishing
	// on server-side setting On().Publish callback to client connection.
	Publish bool `json:"publish"`

	// SubscribeToPublish turns on an automatic check that client subscribed
	// on channel before allow it to publish into that channel.
	SubscribeToPublish bool `mapstructure:"subscribe_to_publish" json:"subscribe_to_publish"`

	// Anonymous enables anonymous access (with empty user ID) to channel.
	// In most situations your application works with authenticated users so
	// every user has its own unique user ID. But if you provide real-time
	// features for public access you may need unauthenticated access to channels.
	// Turn on this option and use empty string as user ID.
	Anonymous bool `json:"anonymous"`

	// JoinLeave turns on join/leave messages for channels.
	// When client subscribes on channel join message sent to all
	// clients in this channel. When client leaves channel (unsubscribes)
	// leave message sent. This option does not fit well for channels with
	// many subscribers because every subscribe/unsubscribe event results
	// into join/leave event broadcast to all other active subscribers.
	JoinLeave bool `mapstructure:"join_leave" json:"join_leave"`

	// Presence turns on presence information for channels.
	// Presence is a structure with clients currently subscribed on channel.
	Presence bool `json:"presence"`

	// HistorySize determines max amount of history messages for channel,
	// 0 means no history for channel. Centrifugo history has auxiliary
	// role – it can not replace your backend persistent storage.
	HistorySize int `mapstructure:"history_size" json:"history_size"`

	// HistoryLifetime determines time in seconds until expiration for
	// history messages. As Centrifuge-based server keeps history in memory
	// (for example in process memory or in Redis process memory) it's
	// important to remove old messages to prevent infinite memory grows.
	HistoryLifetime int `mapstructure:"history_lifetime" json:"history_lifetime"`

	// Recover enables recover mechanism for channels. This means that
	// server will try to recover missed messages for resubscribing
	// client. This option uses publications from history and must be used
	// with reasonable HistorySize and HistoryLifetime configuration.
	HistoryRecover bool `mapstructure:"history_recover" json:"history_recover"`
}

ChannelOptions represent channel specific configuration for namespace or global channel options if set on top level of configuration.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents client connection to server.

func (*Client) Channels

func (c *Client) Channels() map[string]ChannelContext

Channels returns a map of channels client connection currently subscribed to.

func (*Client) Close

func (c *Client) Close(disconnect *Disconnect) error

Close client connection with specific disconnect reason.

func (*Client) ID

func (c *Client) ID() string

ID returns unique client connection id.

func (*Client) On

func (c *Client) On() *ClientEventHub

On returns ClientEventHub to set various event handlers to client.

func (*Client) Send

func (c *Client) Send(data Raw) error

Send data to client connection asynchronously.

func (*Client) Transport

func (c *Client) Transport() Transport

Transport returns transport used by client connection.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ch string, resubscribe bool) error

Unsubscribe allows to unsubscribe client from channel.

func (*Client) UserID

func (c *Client) UserID() string

UserID returns user ID associated with client connection.

type ClientEventHub

type ClientEventHub struct {
	// contains filtered or unexported fields
}

ClientEventHub allows to deal with client event handlers. All its methods are not goroutine-safe and supposed to be called once on client connect.

func (*ClientEventHub) Disconnect

func (c *ClientEventHub) Disconnect(h DisconnectHandler)

Disconnect allows to set DisconnectHandler. DisconnectHandler called when client disconnected.

func (*ClientEventHub) Message

func (c *ClientEventHub) Message(h MessageHandler)

Message allows to set MessageHandler. MessageHandler called when client sent asynchronous message.

func (*ClientEventHub) Publish

func (c *ClientEventHub) Publish(h PublishHandler)

Publish allows to set PublishHandler. PublishHandler called when client publishes message into channel.

func (*ClientEventHub) RPC

func (c *ClientEventHub) RPC(h RPCHandler)

RPC allows to set RPCHandler. RPCHandler will be executed on every incoming RPC call.

func (*ClientEventHub) Refresh

func (c *ClientEventHub) Refresh(h RefreshHandler)

Refresh allows to set RefreshHandler. RefreshHandler called when it's time to refresh client connection credentials.

func (*ClientEventHub) SubRefresh

func (c *ClientEventHub) SubRefresh(h SubRefreshHandler)

SubRefresh allows to set SubRefreshHandler. SubRefreshHandler called when it's time to refresh client subscription.

func (*ClientEventHub) Subscribe

func (c *ClientEventHub) Subscribe(h SubscribeHandler)

Subscribe allows to set SubscribeHandler. SubscribeHandler called when client subscribes on channel.

func (*ClientEventHub) Unsubscribe

func (c *ClientEventHub) Unsubscribe(h UnsubscribeHandler)

Unsubscribe allows to set UnsubscribeHandler. UnsubscribeHandler called when client unsubscribes from channel.

type ClientInfo

type ClientInfo = proto.ClientInfo

ClientInfo is short information about client connection.

type Closer

type Closer interface {
	// Close when called should clean up used resources.
	Close(ctx context.Context) error
}

Closer is an interface that Broker, HistoryManager and PresenceManager can optionally implement if they need to close any resources on Centrifuge node shutdown.

type Config

type Config struct {
	// Version of server – will be sent to client on connection establishement
	// phase in response to connect request.
	Version string
	// Name of this server node - must be unique, used as human readable
	// and meaningful node identificator.
	Name string
	// Secret is a secret key used to generate connection and subscription tokens.
	Secret string
	// ChannelOptions embedded.
	ChannelOptions
	// Namespaces – list of namespaces for custom channel options.
	Namespaces []ChannelNamespace
	// ClientInsecure turns on insecure mode for client connections - when it's
	// turned on then no authentication required at all when connecting to Centrifugo,
	// anonymous access and publish allowed for all channels, no connection expire
	// performed. This can be suitable for demonstration or personal usage.
	ClientInsecure bool
	// ClientAnonymous when set to true, allows connect requests without specifying
	// a token or setting Credentials in authentication middleware. The resulting
	// user will have empty string for user ID, meaning user can only subscribe
	// to anonymous channels.
	ClientAnonymous bool
	// ClientPresencePingInterval is an interval how often connected clients
	// must update presence info.
	ClientPresencePingInterval time.Duration
	// ClientPresenceExpireInterval is an interval how long to consider
	// presence info valid after receiving presence ping.
	ClientPresenceExpireInterval time.Duration
	// ClientPingInterval sets interval server will send ping messages to clients.
	ClientPingInterval time.Duration
	// ClientExpiredCloseDelay is an extra time given to client to
	// refresh its connection in the end of connection lifetime.
	ClientExpiredCloseDelay time.Duration
	// ClientExpiredSubCloseDelay is an extra time given to client to
	// refresh its expiring subscription in the end of subscription lifetime.
	ClientExpiredSubCloseDelay time.Duration
	// ClientStaleCloseDelay is a timeout after which connection will be
	// closed if still not authenticated (i.e. no valid connect command
	// received yet).
	ClientStaleCloseDelay time.Duration
	// ClientChannelPositionCheckDelay defines minimal time from previous
	// client position check in channel. If client does not pass check it will
	// be disconnected with DisconnectInsufficientState.
	ClientChannelPositionCheckDelay time.Duration
	// ClientMessageWriteTimeout is maximum time of write message operation.
	// Slow client will be disconnected. By default we don't use this option (i.e. it's 0)
	// and slow client connections will be closed when there queue size exceeds
	// ClientQueueMaxSize. In case of SockJS transport we don't have control over it so
	// it only affects raw websocket.
	ClientMessageWriteTimeout time.Duration
	// ClientRequestMaxSize sets maximum size in bytes of allowed client request.
	ClientRequestMaxSize int
	// ClientQueueMaxSize is a maximum size of client's message queue in bytes.
	// After this queue size exceeded Centrifugo closes client's connection.
	ClientQueueMaxSize int
	// ClientChannelLimit sets upper limit of channels each client can subscribe to.
	ClientChannelLimit int
	// ClientUserConnectionLimit limits number of client connections from user with the
	// same ID. 0 - unlimited.
	ClientUserConnectionLimit int
	// ChannelPrivatePrefix is a prefix in channel name which indicates that
	// channel is private.
	ChannelPrivatePrefix string
	// ChannelNamespaceBoundary is a string separator which must be put after
	// namespace part in channel name.
	ChannelNamespaceBoundary string
	// ChannelUserBoundary is a string separator which must be set before allowed
	// users part in channel name.
	ChannelUserBoundary string
	// ChannelUserSeparator separates allowed users in user part of channel name.
	ChannelUserSeparator string
	// ChannelMaxLength is a maximum length of channel name.
	ChannelMaxLength int
	// NodeInfoMetricsAggregateInterval sets interval for automatic metrics aggregation.
	// It's not very reasonable to have it less than one second.
	NodeInfoMetricsAggregateInterval time.Duration

	// LogLevel is a log level to use. By default nothing will be logged.
	LogLevel LogLevel
	// LogHandler is a handler func node will send logs to.
	LogHandler LogHandler
}

Config contains Application configuration options.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates config and returns error if problems found

type ConnectEvent

type ConnectEvent struct {
	// ClientID that was generated by library for client connection.
	ClientID string
	// Token received from client as part of Connect Command.
	Token string
	// Data received from client as part of Connect Command.
	Data Raw
}

ConnectEvent contains fields related to connecting event.

type ConnectReply

type ConnectReply struct {
	// Context allows to return modified context.
	Context context.Context
	// Error for connect command reply.
	Error *Error
	// Disconnect client.
	Disconnect *Disconnect
	// Credentials should be set if app wants to authenticate connection.
	// This field still optional as auth could be provided through HTTP middleware
	// or via JWT token.
	Credentials *Credentials
	// Data allows to set custom data in connect reply.
	Data Raw
}

ConnectReply contains fields determining the reaction on auth event.

type ConnectedHandler added in v0.0.2

type ConnectedHandler func(context.Context, *Client)

ConnectedHandler called when new client connects to server.

type ConnectingHandler added in v0.0.2

type ConnectingHandler func(context.Context, Transport, ConnectEvent) ConnectReply

ConnectingHandler called when new client authenticates on server.

type Credentials

type Credentials struct {
	UserID   string
	ExpireAt int64
	Info     []byte
}

Credentials allows to authenticate connection when set into context.

type Disconnect

type Disconnect struct {
	// Code is disconnect code.
	Code int `json:"-"`
	// Reason is a short description of disconnect.
	Reason string `json:"reason"`
	// Reconnect gives client an advice to reconnect after disconnect or not.
	Reconnect bool `json:"reconnect"`
}

Disconnect allows to configure how client will be disconnected from server.

type DisconnectEvent

type DisconnectEvent struct {
	Disconnect *Disconnect
}

DisconnectEvent contains fields related to disconnect event.

type DisconnectHandler

type DisconnectHandler func(DisconnectEvent) DisconnectReply

DisconnectHandler called when client disconnects from server.

type DisconnectReply

type DisconnectReply struct{}

DisconnectReply contains fields determining the reaction on disconnect event.

type Encoding

type Encoding = proto.Encoding

Encoding represents client connection transport encoding format.

type Engine

type Engine interface {
	Broker
	HistoryManager
	PresenceManager
}

Engine is responsible for PUB/SUB mechanics, channel history and presence information.

type Error

type Error = proto.Error

Error represents client reply error.

type HistoryFilter

type HistoryFilter struct {
	// Since used to recover missed messages since provided RecoveryPosition.
	Since *RecoveryPosition
	// Limit number of publications to return.
	Limit int
}

HistoryFilter allows to filter history according to fields set.

type HistoryManager

type HistoryManager interface {
	// History returns a slice of publications published into channel.
	// HistoryFilter allows to set several filtering options.
	// History must return Publications with Seq and Gen set.
	History(ch string, filter HistoryFilter) ([]*Publication, RecoveryPosition, error)
	// AddHistory adds Publication to channel history. Storage should
	// automatically maintain history size and lifetime according to
	// channel options if needed.
	// The returned value is Publication ready to be published to Broker.
	// If returned Publication is nil then node will not try to publish
	// it to Broker at all. This is useful for situations when engine can
	// atomically save Publication to history and publish it to channel.
	AddHistory(ch string, pub *Publication, opts *ChannelOptions) (*Publication, error)
	// RemoveHistory removes history from channel. This is in general not
	// needed as history expires automatically (based on history_lifetime)
	// but sometimes can be useful for application logic.
	RemoveHistory(ch string) error
}

HistoryManager is responsible for dealing with channel history management.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub manages client connections.

func (*Hub) Channels

func (h *Hub) Channels() []string

Channels returns a slice of all active channels.

func (*Hub) NumChannels

func (h *Hub) NumChannels() int

NumChannels returns a total number of different channels.

func (*Hub) NumClients

func (h *Hub) NumClients() int

NumClients returns total number of client connections.

func (*Hub) NumSubscribers

func (h *Hub) NumSubscribers(ch string) int

NumSubscribers returns number of current subscribers for a given channel.

func (*Hub) NumUsers

func (h *Hub) NumUsers() int

NumUsers returns a number of unique users connected.

type Info

type Info struct {
	Nodes []NodeInfo
}

Info contains information about all known server nodes.

type Join

type Join = proto.Join

Join sent to channel after someone subscribed.

type Leave

type Leave = proto.Leave

Leave sent to channel after someone unsubscribed.

type LogEntry

type LogEntry struct {
	Level   LogLevel
	Message string
	Fields  map[string]interface{}
}

LogEntry represents log entry.

func NewLogEntry

func NewLogEntry(level LogLevel, message string, fields ...map[string]interface{}) LogEntry

NewLogEntry creates new LogEntry.

type LogHandler

type LogHandler func(LogEntry)

LogHandler handles log entries - i.e. writes into correct destination if necessary.

type LogLevel

type LogLevel int

LogLevel describes the chosen log level.

const (
	// LogLevelNone means no logging.
	LogLevelNone LogLevel = iota
	// LogLevelDebug turns on debug logs - its generally too much for production in normal
	// conditions but can help when developing and investigating problems in production.
	LogLevelDebug
	// LogLevelInfo is logs useful server information. This includes various information
	// about problems with client connections which is not Centrifugo errors but
	// in most situations malformed client behaviour.
	LogLevelInfo
	// LogLevelError level logs only server errors. This is logging that means non-working
	// Centrifugo and maybe effort from developers/administrators to make things
	// work again.
	LogLevelError
)

type MemoryEngine

type MemoryEngine struct {
	// contains filtered or unexported fields
}

MemoryEngine is builtin default engine which allows to run Centrifuge-based server without any external brokers/storages. All data managed inside process memory.

With this engine you can only run single Centrifuge node. If you need to scale you should use another engine implementation instead – for example Redis engine.

Running single node can be sufficient for many use cases especially when you need maximum performance and do not too many online clients. Consider configuring your load balancer to have one backup Centrifuge node for HA in this case.

func NewMemoryEngine

func NewMemoryEngine(n *Node, conf MemoryEngineConfig) (*MemoryEngine, error)

NewMemoryEngine initializes Memory Engine.

func (*MemoryEngine) AddHistory

func (e *MemoryEngine) AddHistory(ch string, pub *Publication, opts *ChannelOptions) (*Publication, error)

AddHistory - see engine interface description.

func (*MemoryEngine) AddPresence

func (e *MemoryEngine) AddPresence(ch string, uid string, info *ClientInfo, exp time.Duration) error

AddPresence - see engine interface description.

func (*MemoryEngine) Channels

func (e *MemoryEngine) Channels() ([]string, error)

Channels - see engine interface description.

func (*MemoryEngine) History

func (e *MemoryEngine) History(ch string, filter HistoryFilter) ([]*Publication, RecoveryPosition, error)

History - see engine interface description.

func (*MemoryEngine) Presence

func (e *MemoryEngine) Presence(ch string) (map[string]*ClientInfo, error)

Presence - see engine interface description.

func (*MemoryEngine) PresenceStats

func (e *MemoryEngine) PresenceStats(ch string) (PresenceStats, error)

PresenceStats - see engine interface description.

func (*MemoryEngine) Publish

func (e *MemoryEngine) Publish(ch string, pub *Publication, opts *ChannelOptions) error

Publish adds message into history hub and calls node ClientMsg method to handle message. We don't have any PUB/SUB here as Memory Engine is single node only.

func (*MemoryEngine) PublishControl

func (e *MemoryEngine) PublishControl(data []byte) error

PublishControl - see Engine interface description.

func (*MemoryEngine) PublishJoin

func (e *MemoryEngine) PublishJoin(ch string, join *Join, opts *ChannelOptions) error

PublishJoin - see engine interface description.

func (*MemoryEngine) PublishLeave

func (e *MemoryEngine) PublishLeave(ch string, leave *Leave, opts *ChannelOptions) error

PublishLeave - see engine interface description.

func (*MemoryEngine) RemoveHistory

func (e *MemoryEngine) RemoveHistory(ch string) error

RemoveHistory - see engine interface description.

func (*MemoryEngine) RemovePresence

func (e *MemoryEngine) RemovePresence(ch string, uid string) error

RemovePresence - see engine interface description.

func (*MemoryEngine) Run

Run runs memory engine - we do not have any logic here as Memory Engine ready to work just after initialization.

func (*MemoryEngine) Subscribe

func (e *MemoryEngine) Subscribe(ch string) error

Subscribe is noop here.

func (*MemoryEngine) Unsubscribe

func (e *MemoryEngine) Unsubscribe(ch string) error

Unsubscribe node from channel.

type MemoryEngineConfig

type MemoryEngineConfig struct{}

MemoryEngineConfig is a memory engine config.

type MessageEvent

type MessageEvent struct {
	Data Raw
}

MessageEvent contains fields related to message request.

type MessageHandler

type MessageHandler func(MessageEvent) MessageReply

MessageHandler must handle incoming async message from client.

type MessageReply

type MessageReply struct {
	Disconnect *Disconnect
}

MessageReply contains fields determining the reaction on message request.

type Metrics

type Metrics struct {
	Interval float64
	Items    map[string]float64
}

Metrics aggregation over time interval for node.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is a heart of centrifuge library – it internally keeps and manages client connections, maintains information about other centrifuge nodes, keeps useful references to things like engine, hub etc.

func New

func New(c Config) (*Node, error)

New creates Node, the only required argument is config.

func (*Node) ChannelOpts

func (n *Node) ChannelOpts(ch string) (ChannelOptions, bool)

ChannelOpts returns channel options for channel using current channel config.

func (*Node) Channels

func (n *Node) Channels() ([]string, error)

Channels returns list of all channels currently active across on all nodes. This is a snapshot of state mostly useful for understanding what's going on with system.

func (*Node) Config

func (n *Node) Config() Config

Config returns a copy of node Config.

func (*Node) Disconnect

func (n *Node) Disconnect(user string, reconnect bool) error

Disconnect allows to close all user connections to Centrifugo.

func (*Node) History

func (n *Node) History(ch string) ([]*Publication, error)

History returns a slice of last messages published into project channel.

func (*Node) Hub

func (n *Node) Hub() *Hub

Hub returns node's Hub.

func (*Node) Info

func (n *Node) Info() (Info, error)

Info returns aggregated stats from all nodes.

func (*Node) Log

func (n *Node) Log(entry LogEntry)

Log allows to log entry.

func (*Node) LogEnabled

func (n *Node) LogEnabled(level LogLevel) bool

LogEnabled allows to log entry.

func (*Node) NotifyShutdown

func (n *Node) NotifyShutdown() chan struct{}

NotifyShutdown returns a channel which will be closed on node shutdown.

func (*Node) On

func (n *Node) On() NodeEventHub

On allows access to NodeEventHub.

func (*Node) Presence

func (n *Node) Presence(ch string) (map[string]*ClientInfo, error)

Presence returns a map with information about active clients in channel.

func (*Node) PresenceStats

func (n *Node) PresenceStats(ch string) (PresenceStats, error)

PresenceStats returns presence stats from engine.

func (*Node) Publish

func (n *Node) Publish(ch string, data []byte, opts ...PublishOption) error

Publish sends data to all clients subscribed on channel. All running nodes will receive it and will send it to all clients on node subscribed on channel.

func (*Node) Reload

func (n *Node) Reload(c Config) error

Reload node config.

func (*Node) RemoveHistory

func (n *Node) RemoveHistory(ch string) error

RemoveHistory removes channel history.

func (*Node) Run

func (n *Node) Run() error

Run performs node startup actions. At moment must be called once on start after engine set to Node.

func (*Node) SetBroker

func (n *Node) SetBroker(b Broker)

SetBroker allows to set Broker implementation to use.

func (*Node) SetEngine

func (n *Node) SetEngine(e Engine)

SetEngine binds Engine to node.

func (*Node) SetHistoryManager

func (n *Node) SetHistoryManager(m HistoryManager)

SetHistoryManager allows to set HistoryManager to use.

func (*Node) SetPresenceManager

func (n *Node) SetPresenceManager(m PresenceManager)

SetPresenceManager allows to set PresenceManager to use.

func (*Node) Shutdown

func (n *Node) Shutdown(ctx context.Context) error

Shutdown sets shutdown flag to Node so handlers could stop accepting new requests and disconnects clients with shutdown reason.

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(user string, ch string) error

Unsubscribe unsubscribes user from channel, if channel is equal to empty string then user will be unsubscribed from all channels.

type NodeEventHub

type NodeEventHub interface {
	// Auth happens when client sends Connect command to server. In this handler client
	// can reject connection or provide Credentials for it.
	ClientConnecting(handler ConnectingHandler)
	// Connect called after client connection has been successfully established,
	// authenticated and connect reply already sent to client. This is a place
	// where application should set all required connection event callbacks and
	// can start communicating with client.
	ClientConnected(handler ConnectedHandler)
}

NodeEventHub can deal with events binded to Node. All its methods are not goroutine-safe as handlers must be registered once before Node Run method called.

type NodeInfo

type NodeInfo struct {
	UID         string
	Name        string
	Version     string
	NumClients  uint32
	NumUsers    uint32
	NumChannels uint32
	Uptime      uint32
	Metrics     *Metrics
}

NodeInfo contains information about node.

type PresenceManager

type PresenceManager interface {
	// Presence returns actual presence information for channel.
	Presence(ch string) (map[string]*ClientInfo, error)
	// PresenseStats returns short stats of current presence data
	// suitable for scenarios when caller does not need full client
	// info returned by presence method.
	PresenceStats(ch string) (PresenceStats, error)
	// AddPresence sets or updates presence information in channel
	// for connection with specified identifier. Engine should have a
	// property to expire client information that was not updated
	// (touched) after some configured time interval.
	AddPresence(ch string, clientID string, info *ClientInfo, expire time.Duration) error
	// RemovePresence removes presence information for connection
	// with specified identifier.
	RemovePresence(ch string, clientID string) error
}

PresenceManager is responsible for channel presence management.

type PresenceStats

type PresenceStats struct {
	NumClients int
	NumUsers   int
}

PresenceStats represents a short presence information for channel.

type Publication

type Publication = proto.Publication

Publication allows to deliver custom payload to all channel subscribers.

type PublishEvent

type PublishEvent struct {
	Channel string
	Data    Raw
	Info    *ClientInfo
}

PublishEvent contains fields related to publish event.

type PublishHandler

type PublishHandler func(PublishEvent) PublishReply

PublishHandler called when client publishes into channel.

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption is a type to represent various Publish options.

func SkipHistory

func SkipHistory() PublishOption

SkipHistory allows to set SkipHistory to true.

type PublishOptions

type PublishOptions struct {
	// SkipHistory allows to prevent saving specific Publication to channel history.
	SkipHistory bool
}

PublishOptions define some fields to alter behaviour of Publish operation.

type PublishReply

type PublishReply struct {
	Error      *Error
	Disconnect *Disconnect
}

PublishReply contains fields determining the reaction on publish event.

type Push

type Push = proto.Push

Push wraps Publication, Join or Leave.

type RPCEvent

type RPCEvent struct {
	Data Raw
}

RPCEvent contains fields related to rpc request.

type RPCHandler

type RPCHandler func(RPCEvent) RPCReply

RPCHandler must handle incoming command from client.

type RPCReply

type RPCReply struct {
	Error      *Error
	Disconnect *Disconnect
	Data       Raw
}

RPCReply contains fields determining the reaction on rpc request.

type Raw

type Raw = proto.Raw

Raw represents raw bytes.

type RecoveryPosition

type RecoveryPosition struct {
	// Seq defines publication sequence.
	Seq uint32
	// Gen defines publication sequence generation. The reason why we use both seq and
	// gen is the fact that Javascript can't properly work with big numbers. As we not
	// only support JSON but also Protobuf protocol format decision was made to be
	// effective in serialization size and not pass sequences as strings.
	Gen uint32
	// Epoch of sequence and generation. Allows to handle situations when storage
	// lost seq and gen for some reason and we don't want to improperly decide
	// that publications were successfully recovered.
	Epoch string
}

RecoveryPosition contains fields to rely in recovery process. More info about recovery in docs: https://centrifugal.github.io/centrifugo/server/recover/

type RedisEngine

type RedisEngine struct {
	// contains filtered or unexported fields
}

RedisEngine uses Redis datastructures and PUB/SUB to manage Centrifugo logic. This engine allows to scale Centrifugo - you can run several Centrifugo instances connected to the same Redis and load balance clients between instances.

func NewRedisEngine

func NewRedisEngine(n *Node, config RedisEngineConfig) (*RedisEngine, error)

NewRedisEngine initializes Redis Engine.

func (*RedisEngine) AddHistory

func (e *RedisEngine) AddHistory(ch string, pub *Publication, opts *ChannelOptions) (*Publication, error)

AddHistory - see engine interface description.

func (*RedisEngine) AddPresence

func (e *RedisEngine) AddPresence(ch string, uid string, info *ClientInfo, exp time.Duration) error

AddPresence - see engine interface description.

func (*RedisEngine) Channels

func (e *RedisEngine) Channels() ([]string, error)

Channels - see engine interface description.

func (*RedisEngine) History

func (e *RedisEngine) History(ch string, filter HistoryFilter) ([]*Publication, RecoveryPosition, error)

History - see engine interface description.

func (*RedisEngine) Presence

func (e *RedisEngine) Presence(ch string) (map[string]*ClientInfo, error)

Presence - see engine interface description.

func (*RedisEngine) PresenceStats

func (e *RedisEngine) PresenceStats(ch string) (PresenceStats, error)

PresenceStats - see engine interface description.

func (*RedisEngine) Publish

func (e *RedisEngine) Publish(ch string, pub *Publication, opts *ChannelOptions) error

Publish - see engine interface description.

func (*RedisEngine) PublishControl

func (e *RedisEngine) PublishControl(data []byte) error

PublishControl - see engine interface description.

func (*RedisEngine) PublishJoin

func (e *RedisEngine) PublishJoin(ch string, join *Join, opts *ChannelOptions) error

PublishJoin - see engine interface description.

func (*RedisEngine) PublishLeave

func (e *RedisEngine) PublishLeave(ch string, leave *Leave, opts *ChannelOptions) error

PublishLeave - see engine interface description.

func (*RedisEngine) RemoveHistory

func (e *RedisEngine) RemoveHistory(ch string) error

RemoveHistory - see engine interface description.

func (*RedisEngine) RemovePresence

func (e *RedisEngine) RemovePresence(ch string, uid string) error

RemovePresence - see engine interface description.

func (*RedisEngine) Run

Run runs engine after node initialized.

func (*RedisEngine) Subscribe

func (e *RedisEngine) Subscribe(ch string) error

Subscribe - see engine interface description.

func (*RedisEngine) Unsubscribe

func (e *RedisEngine) Unsubscribe(ch string) error

Unsubscribe - see engine interface description.

type RedisEngineConfig

type RedisEngineConfig struct {
	PublishOnHistoryAdd bool
	Shards              []RedisShardConfig
}

RedisEngineConfig is a config for Redis Engine.

type RedisShardConfig

type RedisShardConfig struct {
	// Host is Redis server host.
	Host string
	// Port is Redis server port.
	Port int
	// Password is password to use when connecting to Redis database. If empty then password not used.
	Password string
	// DB is Redis database number. If not set then database 0 used.
	DB int
	// Whether to use TLS connection or not.
	UseTLS bool
	// Whether to skip hostname verification as part of TLS handshake.
	TLSSkipVerify bool
	// Connection TLS configuration.
	TLSConfig *tls.Config
	// MasterName is a name of Redis instance master Sentinel monitors.
	MasterName string
	// SentinelAddrs is a slice of Sentinel addresses.
	SentinelAddrs []string
	// Prefix to use before every channel name and key in Redis.
	Prefix string
	// IdleTimeout is timeout after which idle connections to Redis will be closed.
	IdleTimeout time.Duration
	// PubSubNumWorkers sets how many PUB/SUB message processing workers will be started.
	// By default we start runtime.NumCPU() workers.
	PubSubNumWorkers int
	// ReadTimeout is a timeout on read operations. Note that at moment it should be greater
	// than node ping publish interval in order to prevent timing out Pubsub connection's
	// Receive call.
	ReadTimeout time.Duration
	// WriteTimeout is a timeout on write operations.
	WriteTimeout time.Duration
	// ConnectTimeout is a timeout on connect operation.
	ConnectTimeout time.Duration
}

RedisShardConfig is struct with Redis Engine options.

type RefreshEvent

type RefreshEvent struct{}

RefreshEvent contains fields related to refresh event.

type RefreshHandler

type RefreshHandler func(RefreshEvent) RefreshReply

RefreshHandler called when it's time to validate client connection and update it's expiration time.

type RefreshReply

type RefreshReply struct {
	ExpireAt int64
	Info     Raw
}

RefreshReply contains fields determining the reaction on refresh event.

type SockjsConfig

type SockjsConfig struct {
	// HandlerPrefix sets prefix for SockJS handler endpoint path.
	HandlerPrefix string

	// URL is URL address to SockJS client javascript library.
	URL string

	// HeartbeatDelay sets how often to send heartbeat frames to clients.
	HeartbeatDelay time.Duration

	// WebsocketReadBufferSize is a parameter that is used for raw websocket Upgrader.
	// If set to zero reasonable default value will be used.
	WebsocketReadBufferSize int

	// WebsocketWriteBufferSize is a parameter that is used for raw websocket Upgrader.
	// If set to zero reasonable default value will be used.
	WebsocketWriteBufferSize int
}

SockjsConfig represents config for SockJS handler.

type SockjsHandler

type SockjsHandler struct {
	// contains filtered or unexported fields
}

SockjsHandler accepts SockJS connections.

func NewSockjsHandler

func NewSockjsHandler(n *Node, c SockjsConfig) *SockjsHandler

NewSockjsHandler creates new SockjsHandler.

func (*SockjsHandler) ServeHTTP

func (s *SockjsHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)

type SubRefreshEvent

type SubRefreshEvent struct {
	Channel string
}

SubRefreshEvent contains fields related to subscription refresh event.

type SubRefreshHandler

type SubRefreshHandler func(SubRefreshEvent) SubRefreshReply

SubRefreshHandler called when it's time to validate client subscription to channel and update it's state if needed.

type SubRefreshReply

type SubRefreshReply struct {
	Expired  bool
	ExpireAt int64
	Info     Raw
}

SubRefreshReply contains fields determining the reaction on subscription refresh event.

type SubscribeEvent

type SubscribeEvent struct {
	Channel string
}

SubscribeEvent contains fields related to subscribe event.

type SubscribeHandler

type SubscribeHandler func(SubscribeEvent) SubscribeReply

SubscribeHandler called when client wants to subscribe on channel.

type SubscribeReply

type SubscribeReply struct {
	Error       *Error
	Disconnect  *Disconnect
	ExpireAt    int64
	ChannelInfo Raw
}

SubscribeReply contains fields determining the reaction on subscribe event.

type Transport

type Transport interface {
	// Name returns a name of transport used for client connection.
	Name() string
	// Encoding returns transport encoding used.
	Encoding() Encoding
	// Info returns transport information.
	Info() TransportInfo
}

Transport abstracts a connection transport between server and client.

type TransportInfo

type TransportInfo struct {
	// Request contains initial HTTP request sent by client. Can be nil in case of
	// non-HTTP based transports. Though both Websocket and SockjS we currently
	// support use HTTP on start so this field will present.
	Request *http.Request
}

TransportInfo contains extended transport description.

type UnsubscribeEvent

type UnsubscribeEvent struct {
	Channel string
}

UnsubscribeEvent contains fields related to unsubscribe event.

type UnsubscribeHandler

type UnsubscribeHandler func(UnsubscribeEvent) UnsubscribeReply

UnsubscribeHandler called when client unsubscribed from channel.

type UnsubscribeReply

type UnsubscribeReply struct {
}

UnsubscribeReply contains fields determining the reaction on unsubscribe event.

type WebsocketConfig

type WebsocketConfig struct {
	// Compression allows to enable websocket permessage-deflate
	// compression support for raw websocket connections. It does
	// not guarantee that compression will be used - i.e. it only
	// says that server will try to negotiate it with client.
	Compression bool

	// CompressionLevel sets a level for websocket compression.
	// See posiible value description at https://golang.org/pkg/compress/flate/#NewWriter
	CompressionLevel int

	// CompressionMinSize allows to set minimal limit in bytes for
	// message to use compression when writing it into client connection.
	// By default it's 0 - i.e. all messages will be compressed when
	// WebsocketCompression enabled and compression negotiated with client.
	CompressionMinSize int

	// ReadBufferSize is a parameter that is used for raw websocket Upgrader.
	// If set to zero reasonable default value will be used.
	ReadBufferSize int

	// WriteBufferSize is a parameter that is used for raw websocket Upgrader.
	// If set to zero reasonable default value will be used.
	WriteBufferSize int

	// CheckOrigin func to provide custom origin check logic.
	// nil means allow all origins.
	CheckOrigin func(r *http.Request) bool
}

WebsocketConfig represents config for WebsocketHandler.

type WebsocketHandler

type WebsocketHandler struct {
	// contains filtered or unexported fields
}

WebsocketHandler handles websocket client connections.

func NewWebsocketHandler

func NewWebsocketHandler(n *Node, c WebsocketConfig) *WebsocketHandler

NewWebsocketHandler creates new WebsocketHandler.

func (*WebsocketHandler) ServeHTTP

func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)

Directories

Path Synopsis
_examples module
custom_broker/natsbroker
Package natsbroker defines custom Nats Broker for Centrifuge library.
Package natsbroker defines custom Nats Broker for Centrifuge library.
internal
auth
Package auth provides functions to generate and check Centrifugo auth signs.
Package auth provides functions to generate and check Centrifugo auth signs.
priority
Package priority provides priority queue for libcentrifugo package Memory Engine.
Package priority provides priority queue for libcentrifugo package Memory Engine.
proto
Package proto is a generated protocol buffer package.
Package proto is a generated protocol buffer package.
proto/controlproto
Package controlproto is a generated protocol buffer package.
Package controlproto is a generated protocol buffer package.
uuid
Package uuid provides implementation of Universally Unique Identifier (UUID).
Package uuid provides implementation of Universally Unique Identifier (UUID).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL