centrifuge

package module
v0.18.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2021 License: MIT Imports: 41 Imported by: 92

README

Build Status codecov.io GoDoc

This library has no v1 release yet, API still evolves. Use with strict versioning. At this moment patch version updates only have backwards compatible changes and fixes, minor version updates can have backwards-incompatible API changes. See v1.0.0 milestone. Master branch can have unreleased code.

Centrifuge library is a real-time core of Centrifugo server. It's also supposed to be a general purpose real-time messaging library for Go programming language. The library built on top of strict client-server protocol schema and exposes various real-time oriented primitives for a developer. Centrifuge solves several problems a developer may come across when building complex real-time applications – like scalability (millions of connections), proper persistent connection management and invalidation, fast reconnect with message recovery, WebSocket fallback option.

Library highlights:

  • Fast and optimized for low-latency communication with millions of client connections. See test stand with 1 million connections in Kubernetes
  • Builtin bidirectional transports: WebSocket (JSON or binary Protobuf) and SockJS (JSON only)
  • Possibility to use unidirectional transports without using custom Centrifuge client library: see examples for GRPC, EventSource(SSE), Fetch Streams, Unidirectional WebSocket
  • Built-in horizontal scalability with Redis PUB/SUB, consistent Redis sharding, Sentinel and Redis Cluster for HA
  • Native authentication over HTTP middleware or custom token-based
  • Channel concept to broadcast message to all active subscribers
  • Client-side and server-side channel subscriptions
  • Bidirectional asynchronous message communication and RPC calls
  • Presence information for channels (show all active clients in a channel)
  • History information for channels (ephemeral streams with size and TTL retention)
  • Join/leave events for channels (aka client goes online/offline)
  • Possibility to register a custom PUB/SUB Broker and Presence Manager implementations
  • Message recovery mechanism for channels to survive PUB/SUB delivery problems, short network disconnects or node restart
  • Prometheus instrumentation
  • Client libraries for main application environments (see below)

For bidirectional communication between a client and a Centrifuge-based server we have a bunch of client libraries:

If you opt for a unidirectional communication then you may leverage Centrifuge possibilities without any specific library on client-side - simply by using native browser API or GRPC-generated code. See examples of unidirectional communication over GRPC, EventSource(SSE), Fetch Streams, WebSocket.

Explore Centrifuge

Installation

To install use:

go get github.com/centrifugal/centrifuge

go mod is a recommended way of adding this library to your project dependencies.

Quick example

Let's take a look on how to build the simplest real-time chat with Centrifuge library. Clients will be able to connect to a server over Websocket, send a message into a channel and this message will be instantly delivered to all active channel subscribers. On a server side we will accept all connections and will work as a simple PUB/SUB proxy without worrying too much about permissions. In this example we will use Centrifuge Javascript client on a frontend.

Create file main.go with the following code:

package main

import (
	"log"
	"net/http"

	// Import this library.
	"github.com/centrifugal/centrifuge"
)

// Authentication middleware example. Centrifuge expects Credentials
// with current user ID set. Without provided Credentials client
// connection won't be accepted. Another way to authenticate connection
// is reacting to node.OnConnecting event where you may authenticate
// connection based on a custom token sent by a client in first protocol
// frame. See _examples folder in repo to find real-life auth samples
// (OAuth2, Gin sessions, JWT etc).
func auth(h http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		ctx := r.Context()
		// Put authentication Credentials into request Context.
		// Since we don't have any session backend here we simply
		// set user ID as empty string. Users with empty ID called
		// anonymous users, in real app you should decide whether
		// anonymous users allowed to connect to your server or not.
		cred := &centrifuge.Credentials{
			UserID: "",
		}
		newCtx := centrifuge.SetCredentials(ctx, cred)
		r = r.WithContext(newCtx)
		h.ServeHTTP(w, r)
	})
}

func main() {
	// We use default config here as a starting point. Default config
	// contains reasonable values for available options.
	cfg := centrifuge.DefaultConfig

	// Node is the core object in Centrifuge library responsible for
	// many useful things. For example Node allows to publish messages
	// to channels with its Publish method.
	node, err := centrifuge.New(cfg)
	if err != nil {
		log.Fatal(err)
	}

	// Set ConnectHandler called when client successfully connected to Node.
	// Your code inside a handler must be synchronized since it will be called
	// concurrently from different goroutines (belonging to different client
	// connections). See information about connection life cycle in library readme.
	// This handler should not block – so do minimal work here, set required
	// connection event handlers and return.
	node.OnConnect(func(client *centrifuge.Client) {
		// In our example transport will always be Websocket but it can also be SockJS.
		transportName := client.Transport().Name()
		// In our example clients connect with JSON protocol but it can also be Protobuf.
		transportProto := client.Transport().Protocol()
		log.Printf("client connected via %s (%s)", transportName, transportProto)

		// Set SubscribeHandler to react on every channel subscription attempt
		// initiated by a client. Here you can theoretically return an error or
		// disconnect a client from a server if needed. But here we just accept
		// all subscriptions to all channels. In real life you may use a more
		// complex permission check here. The reason why we use callback style
		// inside client event handlers is that it gives a possibility to control
		// operation concurrency to developer and still control order of events.
		client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
			log.Printf("client subscribes on channel %s", e.Channel)
			cb(centrifuge.SubscribeReply{}, nil)
		})

		// By default, clients can not publish messages into channels. By setting
		// PublishHandler we tell Centrifuge that publish from a client-side is
		// possible. Now each time client calls publish method this handler will be
		// called and you have a possibility to validate publication request. After
		// returning from this handler Publication will be published to a channel and
		// reach active subscribers with at most once delivery guarantee. In our simple
		// chat app we allow everyone to publish into any channel but in real case
		// you may have more validation.
		client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
			log.Printf("client publishes into channel %s: %s", e.Channel, string(e.Data))
			cb(centrifuge.PublishReply{}, nil)
		})

		// Set Disconnect handler to react on client disconnect events.
		client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
			log.Printf("client disconnected")
		})
	})

	// Run node. This method does not block. See also node.Shutdown method
	// to finish application gracefully.
	if err := node.Run(); err != nil {
		log.Fatal(err)
	}

	// Now configure HTTP routes.

	// Serve Websocket connections using WebsocketHandler.
	wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})
	http.Handle("/connection/websocket", auth(wsHandler))

	// The second route is for serving index.html file.
	http.Handle("/", http.FileServer(http.Dir("./")))

	log.Printf("Starting server, visit http://localhost:8000")
	if err := http.ListenAndServe(":8000", nil); err != nil {
		log.Fatal(err)
	}
}

Also create file index.html near main.go with content:

<!DOCTYPE html>
<html lang="en">
    <head>
        <meta charset="utf-8">
        <script type="text/javascript" src="https://rawgit.com/centrifugal/centrifuge-js/master/dist/centrifuge.min.js"></script>
        <title>Centrifuge library chat example</title>
    </head>
    <body>
        <input type="text" id="input" />
        <script type="text/javascript">
            // Create Centrifuge object with Websocket endpoint address set in main.go
            const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket');
            function drawText(text) {
                const div = document.createElement('div');
                div.innerHTML = text + '<br>';
                document.body.appendChild(div);
            }
            centrifuge.on('connect', function(ctx){
                drawText('Connected over ' + ctx.transport);
            });
            centrifuge.on('disconnect', function(ctx){
                drawText('Disconnected: ' + ctx.reason);
            });
            const sub = centrifuge.subscribe("chat", function(ctx) {
                drawText(JSON.stringify(ctx.data));
            })
            const input = document.getElementById("input");
            input.addEventListener('keyup', function(e) {
                if (e.keyCode === 13) {
                    sub.publish(this.value);
                    input.value = '';
                }
            });
            // After setting event handlers – initiate actual connection with server.
            centrifuge.connect();
        </script>
    </body>
</html>

Then run as usual:

go run main.go

Open several browser tabs with http://localhost:8000 and see chat in action.

While this example is only the top of an iceberg, it should give you a good insight on library API. Check out examples folder for more.

Keep in mind that Centrifuge library is not a framework to build chat applications. It's a general purpose real-time transport for your messages with some helpful primitives. You can build many kinds of real-time apps on top of this library including chats but depending on application you may need to write business logic yourself.

Tips and tricks

Some useful advices about library here.

Connection life cycle

Let's describe some aspects related to connection life cycle and event handling in Centrifuge:

  • If you set middleware for transport handlers (WebsocketHandler, SockjsHandler) – then it will be called first before a client sent any command to a server and handler had a chance to start working. Just like a regular HTTP middleware. You can put Credentials to Context to authenticate connection.
  • node.OnConnecting called as soon as client sent Connect command to server. At this point no Client instance exists. You have incoming Context and Transport information. You still can authenticate Client at this point (based on string token sent from client side or any other way). Also, you can add extra data to context and return modified context to Centrifuge. Context cancelled as soon as client connection closes. This handler is synchronous and connection read loop can't proceed until you return ConnectReply.
  • node.OnConnect then called (after a reply to Connect command already written to connection). Inside OnConnect closure you have a possibility to define per-connection event handlers. If particular handler not set then client will get ErrorNotAvailable errors requesting it. Remember that none of event handlers available in Centrifuge should block forever – do minimal work, start separate goroutines if you need blocking code.
  • Client initiated request handlers called one by one from connection reading goroutine. This includes OnSubscribe, OnPublish, OnPresence, OnPresenceStats, OnHistory, client-side OnRefresh, client-side OnSubRefresh.
  • Other handlers like OnAlive, OnDisconnect, server-side OnSubRefresh, server-side OnRefresh called from separate internal goroutines.
  • OnAlive handler must not be called after OnDisconnect.
  • Client initiated request handlers can be processed asynchronously in goroutines to manage operation concurrency. This is achieved using callback functions. See concurrency example for more details.

Channel history stream

Centrifuge Broker interface supports saving Publication to history stream on publish. Depending on Broker implementation this feature can be missing though. Builtin Memory and Redis brokers support keeping Publication stream.

When using default MemoryBroker Publication stream kept in process memory and lost as soon as process restarts. RedisBroker keeps Publication stream in Redis LIST or STREAM data structures – reliability inherited from Redis configuration in this case.

Centrifuge library publication stream not meant to be used as the only source of missed Publications for a client. It mostly exists to help many clients reconnect at once (load balancer reload, application deploy) without creating a massive spike in load on your main application database. So application database still required in idiomatic use case.

Centrifuge message recovery protocol feature designed to be used together with reasonably small Publication stream size as all missed publications sent towards client in one protocol frame on resubscribe to channel.

Logging

Centrifuge library exposes logs with different log level. In your app you can set special function to handle these log entries in a way you want.

// Function to handle Centrifuge internal logs.
func handleLog(e centrifuge.LogEntry) {
	log.Printf("%s: %v", e.Message, e.Fields)
}

cfg := centrifuge.DefaultConfig
cfg.LogLevel = centrifuge.LogLevelDebug
cfg.LogHandler = handleLog

Documentation

Overview

Package centrifuge is a real-time messaging library that abstracts several bidirectional transports (Websocket, SockJS) and provides primitives to build scalable real-time applications with Go. It's also used as a core of Centrifugo server (https://github.com/centrifugal/centrifugo).

Centrifuge library provides several features on top of plain Websocket implementation - read highlights in library README on Github – https://github.com/centrifugal/centrifuge.

The API of this library is almost all goroutine-safe except cases where one-time operations like setting callback handlers performed, also your code inside event handlers should be synchronized since event handlers can be called concurrently. Library expects that code inside event handlers will not block. See more information about client connection lifetime and event handler order/concurrency in README on Github.

Also check out examples in repo to see main library concepts in action.

Index

Constants

View Source
const (
	DefaultWebsocketPingInterval     = 25 * time.Second
	DefaultWebsocketWriteTimeout     = 1 * time.Second
	DefaultWebsocketMessageSizeLimit = 65536 // 64KB
)

Defaults.

View Source
const (
	// DefaultRedisPresenceTTL is a default value for presence TTL in Redis.
	DefaultRedisPresenceTTL = 60 * time.Second
	// DefaultRedisPresenceManagerPrefix is a default value for RedisPresenceManagerConfig.Prefix.
	DefaultRedisPresenceManagerPrefix = "centrifuge"
)
View Source
const (
	DefaultRedisReadTimeout    = time.Second
	DefaultRedisWriteTimeout   = time.Second
	DefaultRedisConnectTimeout = time.Second
)
View Source
const (
	PushFlagConnect uint64 = 1 << iota
	PushFlagDisconnect
	PushFlagSubscribe
	PushFlagJoin
	PushFlagLeave
	PushFlagUnsubscribe
	PushFlagPublication
	PushFlagMessage
)

It's possible to disable certain types of pushes to be sent to a client connection by using ClientConfig.DisabledPushFlags.

View Source
const DefaultRedisBrokerPrefix = "centrifuge"

DefaultRedisBrokerPrefix is a default value for RedisBrokerConfig.Prefix.

View Source
const NoLimit = -1

NoLimit defines that limit should not be applied.

Variables

View Source
var (
	// DisconnectNormal is clean disconnect when client cleanly closed connection.
	DisconnectNormal = &Disconnect{
		Code:      3000,
		Reason:    "normal",
		Reconnect: true,
	}
	// DisconnectShutdown sent when node is going to shut down.
	DisconnectShutdown = &Disconnect{
		Code:      3001,
		Reason:    "shutdown",
		Reconnect: true,
	}
	// DisconnectInvalidToken sent when client came with invalid token.
	DisconnectInvalidToken = &Disconnect{
		Code:      3002,
		Reason:    "invalid token",
		Reconnect: false,
	}
	// DisconnectBadRequest sent when client uses malformed protocol
	// frames or wrong order of commands.
	DisconnectBadRequest = &Disconnect{
		Code:      3003,
		Reason:    "bad request",
		Reconnect: false,
	}
	// DisconnectServerError sent when internal error occurred on server.
	DisconnectServerError = &Disconnect{
		Code:      3004,
		Reason:    "internal server error",
		Reconnect: true,
	}
	// DisconnectExpired sent when client connection expired.
	DisconnectExpired = &Disconnect{
		Code:      3005,
		Reason:    "expired",
		Reconnect: true,
	}
	// DisconnectSubExpired sent when client subscription expired.
	DisconnectSubExpired = &Disconnect{
		Code:      3006,
		Reason:    "subscription expired",
		Reconnect: true,
	}
	// DisconnectStale sent to close connection that did not become
	// authenticated in configured interval after dialing.
	DisconnectStale = &Disconnect{
		Code:      3007,
		Reason:    "stale",
		Reconnect: false,
	}
	// DisconnectSlow sent when client can't read messages fast enough.
	DisconnectSlow = &Disconnect{
		Code:      3008,
		Reason:    "slow",
		Reconnect: true,
	}
	// DisconnectWriteError sent when an error occurred while writing to
	// client connection.
	DisconnectWriteError = &Disconnect{
		Code:      3009,
		Reason:    "write error",
		Reconnect: true,
	}
	// DisconnectInsufficientState sent when server detects wrong client
	// position in channel Publication stream. Disconnect allows client
	// to restore missed publications on reconnect.
	DisconnectInsufficientState = &Disconnect{
		Code:      3010,
		Reason:    "insufficient state",
		Reconnect: true,
	}
	// DisconnectForceReconnect sent when server disconnects connection.
	DisconnectForceReconnect = &Disconnect{
		Code:      3011,
		Reason:    "force reconnect",
		Reconnect: true,
	}
	// DisconnectForceNoReconnect sent when server disconnects connection
	// and asks it to not reconnect again.
	DisconnectForceNoReconnect = &Disconnect{
		Code:      3012,
		Reason:    "force disconnect",
		Reconnect: false,
	}
	// DisconnectConnectionLimit can be sent when client connection exceeds a
	// configured connection limit (per user ID or due to other rule).
	DisconnectConnectionLimit = &Disconnect{
		Code:      3013,
		Reason:    "connection limit",
		Reconnect: false,
	}
	// DisconnectChannelLimit can be sent when client connection exceeds a
	// configured channel limit.
	DisconnectChannelLimit = &Disconnect{
		Code:      3013,
		Reason:    "channel limit",
		Reconnect: false,
	}
)

Some predefined disconnect structures used by library internally. Though it's always possible to create Disconnect with any field values on the fly. Library users supposed to use codes in range 4000-4999 for custom disconnects.

View Source
var (
	// ErrorInternal means server error, if returned this is a signal
	// that something went wrong with server itself and client most probably
	// not guilty.
	ErrorInternal = &Error{
		Code:    100,
		Message: "internal server error",
	}
	// ErrorUnauthorized says that request is unauthorized.
	ErrorUnauthorized = &Error{
		Code:    101,
		Message: "unauthorized",
	}
	// ErrorUnknownChannel means that channel name does not exist.
	ErrorUnknownChannel = &Error{
		Code:    102,
		Message: "unknown channel",
	}
	// ErrorPermissionDenied means that access to resource not allowed.
	ErrorPermissionDenied = &Error{
		Code:    103,
		Message: "permission denied",
	}
	// ErrorMethodNotFound means that method sent in command does not exist.
	ErrorMethodNotFound = &Error{
		Code:    104,
		Message: "method not found",
	}
	// ErrorAlreadySubscribed returned when client wants to subscribe on channel
	// it already subscribed to.
	ErrorAlreadySubscribed = &Error{
		Code:    105,
		Message: "already subscribed",
	}
	// ErrorLimitExceeded says that some sort of limit exceeded, server logs should
	// give more detailed information. See also ErrorTooManyRequests which is more
	// specific for rate limiting purposes.
	ErrorLimitExceeded = &Error{
		Code:    106,
		Message: "limit exceeded",
	}
	// ErrorBadRequest says that server can not process received
	// data because it is malformed. Retrying request does not make sense.
	ErrorBadRequest = &Error{
		Code:    107,
		Message: "bad request",
	}
	// ErrorNotAvailable means that resource is not enabled.
	ErrorNotAvailable = &Error{
		Code:    108,
		Message: "not available",
	}
	// ErrorTokenExpired indicates that connection token expired.
	ErrorTokenExpired = &Error{
		Code:    109,
		Message: "token expired",
	}
	// ErrorExpired indicates that connection expired (no token involved).
	ErrorExpired = &Error{
		Code:    110,
		Message: "expired",
	}
	// ErrorTooManyRequests means that server rejected request due to
	// its rate limiting strategies.
	ErrorTooManyRequests = &Error{
		Code:    111,
		Message: "too many requests",
	}
	// ErrorUnrecoverablePosition means that stream does not contain required
	// range of publications to fulfill a history query. This can be happen to
	// expiration, size limitation or due to wrong epoch.
	ErrorUnrecoverablePosition = &Error{
		Code:    112,
		Message: "unrecoverable position",
	}
)

Here we define well-known errors that can be used in client protocol replies. Library user can define own application specific errors. When define new custom error it is recommended to use error codes > 1000 assuming that codes in interval 0-999 reserved by Centrifuge.

View Source
var DefaultConfig = Config{
	NodeInfoMetricsAggregateInterval: 60 * time.Second,
	ClientPresenceUpdateInterval:     25 * time.Second,
	ClientExpiredCloseDelay:          25 * time.Second,
	ClientExpiredSubCloseDelay:       25 * time.Second,
	ClientStaleCloseDelay:            25 * time.Second,
	ClientChannelPositionCheckDelay:  40 * time.Second,
	ClientQueueMaxSize:               10485760,
	ClientChannelLimit:               128,
	ChannelMaxLength:                 255,
}

DefaultConfig is Config initialized with default values for all fields.

Functions

func LogLevelToString

func LogLevelToString(l LogLevel) string

LogLevelToString transforms Level to its string representation.

func NewClient added in v0.1.0

func NewClient(ctx context.Context, n *Node, t Transport) (*Client, ClientCloseFunc, error)

NewClient initializes new Client.

func SetCredentials

func SetCredentials(ctx context.Context, cred *Credentials) context.Context

SetCredentials allows setting connection Credentials to Context. Credentials set to Context in authentication middleware will be used by Centrifuge library to authenticate user.

Types

type AliveHandler added in v0.10.0

type AliveHandler func()

AliveHandler called periodically while connection alive. This is a helper to do periodic things which can tolerate some approximation in time. This callback will run every ClientPresenceUpdateInterval and can save you a timer.

type Broker

type Broker interface {
	// Run called once on start when broker already set to node. At
	// this moment node is ready to process broker events.
	Run(BrokerEventHandler) error

	// Subscribe node on channel to listen all messages coming from channel.
	Subscribe(ch string) error
	// Unsubscribe node from channel to stop listening messages from it.
	Unsubscribe(ch string) error

	// Publish allows to send data into channel. Data should be
	// delivered to all clients subscribed to this channel at moment on any
	// Centrifuge node (with at most once delivery guarantee).
	//
	// Broker can optionally maintain publication history inside channel according
	// to PublishOptions provided. See History method for rules that should be implemented
	// for accessing Publications from history stream.
	//
	// Saving message to a history stream and publish to PUB/SUB should be an atomic
	// operation per channel.
	//
	// StreamPosition returned here describes current stream top offset and epoch.
	// For channels without history this StreamPosition should be empty.
	Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error)
	// PublishJoin publishes Join Push message into channel.
	PublishJoin(ch string, info *ClientInfo) error
	// PublishLeave publishes Leave Push message into channel.
	PublishLeave(ch string, info *ClientInfo) error
	// PublishControl allows to send control command data. If nodeID is empty string
	// then message should be delivered to all running nodes, if nodeID is set then
	// message should be delivered only to node with specified ID.
	PublishControl(data []byte, nodeID, shardKey string) error

	// History used to extract Publications from history stream.
	// Publications returned according to HistoryFilter which allows to set several
	// filtering options. StreamPosition returned describes current history stream
	// top offset and epoch.
	History(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error)
	// RemoveHistory removes history from channel. This is in general not
	// needed as history expires automatically (based on history_lifetime)
	// but sometimes can be useful for application logic.
	RemoveHistory(ch string) error
}

Broker is responsible for PUB/SUB mechanics.

type BrokerEventHandler

type BrokerEventHandler interface {
	// HandlePublication to handle received Publications.
	HandlePublication(ch string, pub *Publication, sp StreamPosition) error
	// HandleJoin to handle received Join messages.
	HandleJoin(ch string, info *ClientInfo) error
	// HandleLeave to handle received Leave messages.
	HandleLeave(ch string, info *ClientInfo) error
	// HandleControl to handle received control data.
	HandleControl(data []byte) error
}

BrokerEventHandler can handle messages received from PUB/SUB system.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents client connection to server.

func (*Client) Channels

func (c *Client) Channels() []string

Channels returns a slice of channels client connection currently subscribed to.

func (*Client) Connect added in v0.16.0

func (c *Client) Connect(req ConnectRequest)

Connect supposed to be called from unidirectional transport layer to pass initial information about connection and thus initiate Node.OnConnecting event. Bidirectional transport initiate connecting workflow automatically since client passes Connect command upon successful connection establishment with a server.

func (*Client) Context added in v0.10.0

func (c *Client) Context() context.Context

Context returns client Context. This context will be canceled as soon as client connection closes.

func (*Client) Disconnect added in v0.10.0

func (c *Client) Disconnect(disconnect *Disconnect)

Disconnect client connection with specific disconnect code and reason. This method internally creates a new goroutine at moment to do closing stuff. An extra goroutine is required to solve disconnect and alive callback ordering/sync problems. Will be a noop if client already closed. As this method runs a separate goroutine client connection will be closed eventually (i.e. not immediately).

func (*Client) Handle added in v0.1.0

func (c *Client) Handle(data []byte) bool

Handle raw data encoded with Centrifuge protocol. Not goroutine-safe. Supposed to be called only from a transport connection reader.

func (*Client) ID

func (c *Client) ID() string

ID returns unique client connection id.

func (*Client) Info added in v0.18.0

func (c *Client) Info() []byte

Info returns connection info.

func (*Client) IsSubscribed added in v0.11.0

func (c *Client) IsSubscribed(ch string) bool

IsSubscribed returns true if client subscribed to a channel.

func (*Client) OnAlive added in v0.13.0

func (c *Client) OnAlive(h AliveHandler)

OnAlive allows setting AliveHandler. AliveHandler called periodically for active client connection.

func (*Client) OnDisconnect added in v0.13.0

func (c *Client) OnDisconnect(h DisconnectHandler)

OnDisconnect allows setting DisconnectHandler. DisconnectHandler called when client disconnected.

func (*Client) OnHistory added in v0.13.0

func (c *Client) OnHistory(h HistoryHandler)

OnHistory allows settings HistoryHandler. HistoryHandler called when History request from client received. At this moment you can only return a custom error or disconnect client.

func (*Client) OnMessage added in v0.13.0

func (c *Client) OnMessage(h MessageHandler)

OnMessage allows setting MessageHandler. MessageHandler called when client sent asynchronous message.

func (*Client) OnPresence added in v0.13.0

func (c *Client) OnPresence(h PresenceHandler)

OnPresence allows setting PresenceHandler. PresenceHandler called when Presence request from client received. At this moment you can only return a custom error or disconnect client.

func (*Client) OnPresenceStats added in v0.13.0

func (c *Client) OnPresenceStats(h PresenceStatsHandler)

OnPresenceStats allows settings PresenceStatsHandler. PresenceStatsHandler called when Presence Stats request from client received. At this moment you can only return a custom error or disconnect client.

func (*Client) OnPublish added in v0.13.0

func (c *Client) OnPublish(h PublishHandler)

OnPublish allows setting PublishHandler. PublishHandler called when client publishes message into channel.

func (*Client) OnRPC added in v0.13.0

func (c *Client) OnRPC(h RPCHandler)

OnRPC allows setting RPCHandler. RPCHandler will be executed on every incoming RPC call.

func (*Client) OnRefresh added in v0.13.0

func (c *Client) OnRefresh(h RefreshHandler)

OnRefresh allows setting RefreshHandler. RefreshHandler called when it's time to refresh expiring client connection.

func (*Client) OnSubRefresh added in v0.13.0

func (c *Client) OnSubRefresh(h SubRefreshHandler)

OnSubRefresh allows setting SubRefreshHandler. SubRefreshHandler called when it's time to refresh client subscription.

func (*Client) OnSubscribe added in v0.13.0

func (c *Client) OnSubscribe(h SubscribeHandler)

OnSubscribe allows setting SubscribeHandler. SubscribeHandler called when client subscribes on a channel.

func (*Client) OnUnsubscribe added in v0.13.0

func (c *Client) OnUnsubscribe(h UnsubscribeHandler)

OnUnsubscribe allows setting UnsubscribeHandler. UnsubscribeHandler called when client unsubscribes from channel.

func (*Client) Refresh added in v0.18.0

func (c *Client) Refresh(opts ...RefreshOption) error

func (*Client) Send

func (c *Client) Send(data []byte) error

Send data to client. This sends an asynchronous message – data will be just written to connection. on client side this message can be handled with Message handler.

func (*Client) Subscribe added in v0.5.0

func (c *Client) Subscribe(channel string, opts ...SubscribeOption) error

Subscribe client to a channel.

func (*Client) Transport

func (c *Client) Transport() TransportInfo

Transport returns client connection transport information.

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(ch string) error

Unsubscribe allows to unsubscribe client from channel.

func (*Client) UserID

func (c *Client) UserID() string

UserID returns user id associated with client connection.

type ClientCloseFunc added in v0.10.0

type ClientCloseFunc func() error

ClientCloseFunc must be called on Transport handler close to clean up Client.

type ClientInfo

type ClientInfo struct {
	// ClientID is a client unique id.
	ClientID string
	// UserID is an ID of authenticated user. Zero value means anonymous user.
	UserID string
	// ConnInfo is an additional information about connection.
	ConnInfo []byte
	// ChanInfo is an additional information about connection in context of
	// channel subscription.
	ChanInfo []byte
}

ClientInfo contains information about client connection.

type Closer

type Closer interface {
	// Close when called should clean up used resources.
	Close(ctx context.Context) error
}

Closer is an interface that Broker and PresenceManager can optionally implement if they need to close any resources on Centrifuge Node graceful shutdown.

type Config

type Config struct {
	// Version of server – will be sent to a client on connection establishment
	// phase in reply to connect command from a client.
	Version string
	// Name is a unique name of the current server Node. Name used as human-readable
	// and meaningful node identifier. If not set then os.Hostname will be used.
	Name string
	// LogLevel is a log level. By default, nothing will be logged by Centrifuge.
	LogLevel LogLevel
	// LogHandler is a handler function Node will send logs to.
	LogHandler LogHandler
	// NodeInfoMetricsAggregateInterval sets interval for automatic metrics
	// aggregation. It's not reasonable to have it less than one second.
	NodeInfoMetricsAggregateInterval time.Duration
	// ClientPresenceUpdateInterval sets an interval how often connected
	// clients update presence information.
	ClientPresenceUpdateInterval time.Duration
	// ClientExpiredCloseDelay is an extra time given to client to refresh
	// its connection in the end of connection TTL. At moment only used for
	// a client-side refresh workflow.
	ClientExpiredCloseDelay time.Duration
	// ClientExpiredSubCloseDelay is an extra time given to client to
	// refresh its expiring subscription in the end of subscription TTL.
	// At the moment only used for a client-side subscription refresh workflow.
	ClientExpiredSubCloseDelay time.Duration
	// ClientStaleCloseDelay is a timeout after which connection will be
	// closed if still not authenticated (i.e. no valid connect command
	// received yet).
	ClientStaleCloseDelay time.Duration
	// ClientChannelPositionCheckDelay defines minimal time from previous
	// client position check in channel. If client does not pass check it will
	// be disconnected with DisconnectInsufficientState.
	ClientChannelPositionCheckDelay time.Duration
	// ClientQueueMaxSize is a maximum size of client's message queue in bytes.
	// After this queue size exceeded Centrifuge closes client's connection.
	ClientQueueMaxSize int
	// ClientChannelLimit sets upper limit of client-side channels each client
	// can subscribe to. Client-side subscriptions attempts will get an ErrorLimitExceeded
	// in subscribe reply. Server-side subscriptions above limit will result into
	// DisconnectChannelLimit.
	ClientChannelLimit int
	// UserConnectionLimit limits number of client connections to single Node
	// from user with the same ID. Zero value means unlimited. Anonymous users
	// can't be tracked.
	UserConnectionLimit int
	// ChannelMaxLength is the maximum length of a channel name. This is only checked
	// for client-side subscription requests.
	ChannelMaxLength int
	// MetricsNamespace is a Prometheus metrics namespace to use for internal metrics.
	// If not set then the default namespace name `centrifuge` will be used.
	MetricsNamespace string
	// HistoryMaxPublicationLimit allows limiting the maximum number of publications to be
	// asked over client API history call. This is useful when you have large streams and
	// want to prevent a massive number of missed messages to be sent to a client when
	// calling history without any limit explicitly set. By default no limit used.
	// This option does not affect Node.History method. See also RecoveryMaxPublicationLimit.
	HistoryMaxPublicationLimit int
	// RecoveryMaxPublicationLimit allows limiting the number of Publications that could be
	// restored during the automatic recovery process. See also HistoryMaxPublicationLimit.
	RecoveryMaxPublicationLimit int
	// UseSingleFlight allows turning on mode where singleflight will be automatically used for
	// Node.History (including recovery) and Node.Presence/Node.PresenceStats calls.
	UseSingleFlight bool
}

Config contains Node configuration options.

type ConnectEvent

type ConnectEvent struct {
	// ClientID that was generated by library for client connection.
	ClientID string
	// Token received from client as part of Connect Command.
	Token string
	// Data received from client as part of Connect Command.
	Data []byte
	// Name can contain client name if provided on connect.
	Name string
	// Version can contain client version if provided on connect.
	Version string
	// Transport contains information about transport used by client.
	Transport TransportInfo
	// Channels is a list of channels a client wants to subscribe to
	// (server-side). It's just a way for a client to provide this list.
	// Server should use ConnectReply.Subscriptions to tell Centrifuge
	// the final list of server-side subscriptions for a connection which
	// can differ from the Channels list.
	Channels []string
}

ConnectEvent contains fields related to connecting event (when a server received Connect protocol command from client).

type ConnectHandler

type ConnectHandler func(*Client)

ConnectHandler called when client connected to server and ready to communicate.

type ConnectReply

type ConnectReply struct {
	// Context allows to return modified context.
	Context context.Context
	// Credentials should be set if app wants to authenticate connection.
	// This field is optional since auth Credentials could be set through
	// HTTP middleware.
	Credentials *Credentials
	// Data allows to set custom data in connect reply.
	Data []byte
	// Subscriptions map contains channels to subscribe connection to on server-side.
	Subscriptions map[string]SubscribeOptions
	// ClientSideRefresh tells library to use client-side refresh logic:
	// i.e. send refresh commands with new connection token. If not set
	// then server-side refresh mechanism will be used.
	ClientSideRefresh bool
}

ConnectReply contains reaction to ConnectEvent.

type ConnectRequest added in v0.16.0

type ConnectRequest struct {
	// Token is an optional token from a client.
	Token string
	// Data is an optional custom data from a client.
	Data []byte
	// Name of a client.
	Name string
	// Version of a client.
	Version string
	// Subs is a map with channel subscription state (for recovery on connect).
	Subs map[string]SubscribeRequest
}

ConnectRequest can be used in a unidirectional connection case to pass initial connection information from a client-side.

type ConnectingHandler added in v0.0.2

type ConnectingHandler func(context.Context, ConnectEvent) (ConnectReply, error)

ConnectingHandler called when new client authenticates on server.

type Credentials

type Credentials struct {
	// UserID tells library an ID of current user. Leave this empty string
	// if you need access from anonymous user.
	UserID string
	// ExpireAt allows to set time in future when connection must be validated.
	// In this case Client.OnRefresh callback must be set by application. Zero
	// value means no expiration.
	ExpireAt int64
	// Info contains additional information about connection. This data will be
	// included untouched into Join/Leave messages, into Presence information,
	// also info can become a part of published message as part of ClientInfo.
	// In some cases having additional info can be an undesired overhead – but
	// you are simply free to not use this field at all.
	Info []byte
}

Credentials allow authenticating connection when set into context.

func GetCredentials added in v0.5.0

func GetCredentials(ctx context.Context) (*Credentials, bool)

GetCredentials allows extracting Credentials from Context (if set previously).

type Disconnect

type Disconnect struct {
	// Code is disconnect code.
	Code uint32 `json:"code,omitempty"`
	// Reason is a short description of disconnect.
	Reason string `json:"reason"`
	// Reconnect gives client an advice to reconnect after disconnect or not.
	Reconnect bool `json:"reconnect"`
	// contains filtered or unexported fields
}

Disconnect allows to configure how client will be disconnected from server. The important note that Disconnect serialized to JSON must be less than 127 bytes due to WebSocket protocol limitations (because at moment we send Disconnect inside reason field of WebSocket close handshake). Note that due to performance reasons we cache Disconnect text representation for Close Frame on first send to client so changing field values inside existing Disconnect instance won't be reflected in WebSocket/Sockjs Close frames.

func (*Disconnect) CloseText added in v0.8.2

func (d *Disconnect) CloseText() string

CloseText allows to build disconnect advice sent inside Close frame. At moment we don't encode Code here to not duplicate information since it is sent separately as Code of WebSocket/SockJS Close Frame.

func (*Disconnect) Error added in v0.10.0

func (d *Disconnect) Error() string

Error representation.

func (*Disconnect) String added in v0.8.2

func (d *Disconnect) String() string

String representation.

type DisconnectEvent

type DisconnectEvent struct {
	// Disconnect can optionally contain a custom disconnect object that
	// was sent from server to client with closing handshake. If this field
	// exists then client connection was closed from server. If this field
	// is nil then this means that client disconnected normally and connection
	// closing was initiated by client side.
	Disconnect *Disconnect
}

DisconnectEvent contains fields related to disconnect event.

type DisconnectHandler

type DisconnectHandler func(DisconnectEvent)

DisconnectHandler called when client disconnects from server. The important thing to remember is that you should not rely entirely on this handler to clean up non-expiring resources (in your database for example). Why? Because in case of any non-graceful node shutdown (kill -9, process crash, machine lost) disconnect handler will never be called (obviously) so you can have stale data.

type DisconnectOption added in v0.8.0

type DisconnectOption func(options *DisconnectOptions)

DisconnectOption is a type to represent various Disconnect options.

func WithDisconnect added in v0.14.0

func WithDisconnect(disconnect *Disconnect) DisconnectOption

WithDisconnect allows to set custom Disconnect.

func WithDisconnectClient added in v0.18.0

func WithDisconnectClient(clientID string) DisconnectOption

WithDisconnectClient allows to set Client.

func WithDisconnectClientWhitelist added in v0.18.0

func WithDisconnectClientWhitelist(whitelist []string) DisconnectOption

WithDisconnectClientWhitelist allows to set ClientWhitelist.

type DisconnectOptions added in v0.8.0

type DisconnectOptions struct {
	// Disconnect represents custom disconnect to use.
	// By default DisconnectForceNoReconnect will be used.
	Disconnect *Disconnect
	// ClientWhitelist contains client IDs to keep.
	ClientWhitelist []string
	// contains filtered or unexported fields
}

DisconnectOptions define some fields to alter behaviour of Disconnect operation.

type Error

type Error struct {
	Code    uint32
	Message string
}

Error represents client reply error.

func (Error) Error added in v0.8.0

func (e Error) Error() string

type HistoryCallback added in v0.13.0

type HistoryCallback func(HistoryReply, error)

HistoryCallback should be called with HistoryReply or error.

type HistoryEvent added in v0.10.0

type HistoryEvent struct {
	Channel string
	Filter  HistoryFilter
}

HistoryEvent has channel operation called for.

type HistoryFilter

type HistoryFilter struct {
	// Since used to extract publications from stream since provided StreamPosition.
	Since *StreamPosition
	// Limit number of publications to return.
	// -1 means no limit - i.e. return all publications currently in stream.
	// 0 means that caller only interested in current stream top position so
	// Broker should not return any publications.
	Limit int
	// Reverse direction.
	Reverse bool
}

HistoryFilter allows to filter history according to fields set.

type HistoryHandler added in v0.10.0

type HistoryHandler func(HistoryEvent, HistoryCallback)

HistoryHandler must handle incoming command from client.

type HistoryOption added in v0.8.0

type HistoryOption func(options *HistoryOptions)

HistoryOption is a type to represent various History options.

func WithLimit added in v0.8.0

func WithLimit(limit int) HistoryOption

WithLimit allows to set HistoryOptions.Limit.

func WithReverse added in v0.18.0

func WithReverse(reverse bool) HistoryOption

WithSince allows to set HistoryOptions.Since option.

func WithSince added in v0.17.0

func WithSince(sp *StreamPosition) HistoryOption

WithSince allows to set HistoryOptions.Since option.

type HistoryOptions added in v0.8.0

type HistoryOptions struct {
	// Since used to extract publications from stream since provided StreamPosition.
	Since *StreamPosition
	// Limit number of publications to return.
	// -1 means no limit - i.e. return all publications currently in stream.
	// 0 means that caller only interested in current stream top position so
	// Broker should not return any publications in result.
	// Positive integer does what it should.
	Limit int
	// Reverse direction
	Reverse bool
}

HistoryOptions define some fields to alter History method behaviour.

type HistoryReply added in v0.10.0

type HistoryReply struct {
	Result *HistoryResult
}

HistoryReply contains fields determining the reaction on history request.

type HistoryResult added in v0.8.0

type HistoryResult struct {
	// StreamPosition embedded here describes current stream top offset and epoch.
	StreamPosition
	// Publications extracted from history storage according to HistoryFilter.
	Publications []*Publication
}

HistoryResult contains Publications and current stream top StreamPosition.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub tracks Client connections on the current Node.

func (*Hub) BroadcastPublication added in v0.17.1

func (h *Hub) BroadcastPublication(ch string, pub *Publication, sp StreamPosition) error

BroadcastPublication sends message to all clients subscribed on a channel on the current Node. Usually this is NOT what you need since in most cases you should use Node.Publish method which uses a Broker to deliver publications to all Nodes in a cluster and maintains publication history in a channel with incremental offset. By calling BroadcastPublication messages will only be sent to the current node subscribers without any defined offset semantics.

func (*Hub) Channels

func (h *Hub) Channels() []string

Channels returns a slice of all active channels.

func (*Hub) NumChannels

func (h *Hub) NumChannels() int

NumChannels returns a total number of different channels.

func (*Hub) NumClients

func (h *Hub) NumClients() int

NumClients returns total number of client connections.

func (*Hub) NumSubscribers

func (h *Hub) NumSubscribers(ch string) int

NumSubscribers returns number of current subscribers for a given channel.

func (*Hub) NumSubscriptions added in v0.18.0

func (h *Hub) NumSubscriptions() int

NumSubscriptions returns a total number of subscriptions.

func (*Hub) NumUsers

func (h *Hub) NumUsers() int

NumUsers returns a number of unique users connected.

func (*Hub) UserConnections added in v0.18.0

func (h *Hub) UserConnections(userID string) map[string]*Client

UserConnections returns all user connections to the current Node.

type Info

type Info struct {
	Nodes []NodeInfo
}

Info contains information about all known server nodes.

type LogEntry

type LogEntry struct {
	Level   LogLevel
	Message string
	Fields  map[string]interface{}
}

LogEntry represents log entry.

func NewLogEntry

func NewLogEntry(level LogLevel, message string, fields ...map[string]interface{}) LogEntry

NewLogEntry creates new LogEntry.

type LogHandler

type LogHandler func(LogEntry)

LogHandler handles log entries - i.e. writes into correct destination if necessary.

type LogLevel

type LogLevel int

LogLevel describes the chosen log level.

const (
	// LogLevelNone means no logging.
	LogLevelNone LogLevel = iota
	// LogLevelTrace turns on trace logs - should only be used during development. This
	// log level shows all client-server communication.
	LogLevelTrace
	// LogLevelDebug turns on debug logs - its generally too much for production in normal
	// conditions but can help when developing and investigating problems in production.
	LogLevelDebug
	// LogLevelInfo is logs useful server information. This includes various information
	// about problems with client connections which is not Centrifuge errors but
	// in most situations malformed client behaviour.
	LogLevelInfo
	// LogLevelError level logs only server errors. This is logging that means non-working
	// Centrifuge and maybe effort from developers/administrators to make things
	// work again.
	LogLevelError
)

type MemoryBroker added in v0.16.0

type MemoryBroker struct {
	// contains filtered or unexported fields
}

MemoryBroker is builtin default Broker which allows to run Centrifuge-based server without any external broker. All data managed inside process memory.

With this Broker you can only run single Centrifuge node. If you need to scale you should consider using another Broker implementation instead – for example RedisBroker.

Running single node can be sufficient for many use cases especially when you need maximum performance and not too many online clients. Consider configuring your load balancer to have one backup Centrifuge node for HA in this case.

func NewMemoryBroker added in v0.16.0

func NewMemoryBroker(n *Node, c MemoryBrokerConfig) (*MemoryBroker, error)

NewMemoryBroker initializes MemoryBroker.

func (*MemoryBroker) Close added in v0.17.1

func (b *MemoryBroker) Close(_ context.Context) error

Close is noop for now.

func (*MemoryBroker) History added in v0.16.0

func (b *MemoryBroker) History(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error)

History - see Broker interface description.

func (*MemoryBroker) Publish added in v0.16.0

func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error)

Publish adds message into history hub and calls node method to handle message. We don't have any PUB/SUB here as Memory Engine is single node only.

func (*MemoryBroker) PublishControl added in v0.16.0

func (b *MemoryBroker) PublishControl(data []byte, _, _ string) error

PublishControl - see Broker interface description.

func (*MemoryBroker) PublishJoin added in v0.16.0

func (b *MemoryBroker) PublishJoin(ch string, info *ClientInfo) error

PublishJoin - see Broker interface description.

func (*MemoryBroker) PublishLeave added in v0.16.0

func (b *MemoryBroker) PublishLeave(ch string, info *ClientInfo) error

PublishLeave - see Broker interface description.

func (*MemoryBroker) RemoveHistory added in v0.16.0

func (b *MemoryBroker) RemoveHistory(ch string) error

RemoveHistory - see Broker interface description.

func (*MemoryBroker) Run added in v0.16.0

Run runs memory broker.

func (*MemoryBroker) Subscribe added in v0.16.0

func (b *MemoryBroker) Subscribe(_ string) error

Subscribe is noop here.

func (*MemoryBroker) Unsubscribe added in v0.16.0

func (b *MemoryBroker) Unsubscribe(_ string) error

Unsubscribe node from channel. Noop here.

type MemoryBrokerConfig added in v0.16.0

type MemoryBrokerConfig struct {
	// HistoryMetaTTL sets a time of inactive stream meta information expiration.
	// This information contains an epoch and offset of each stream. Having this
	// meta information helps in message recovery process.
	// Must have a reasonable value for application.
	// At moment works with seconds precision.
	// TODO v1: since we have epoch, things should also properly work without meta
	// information at all (but we loose possibility of long-term recover in stream
	// without new messages). We can make this optional and disabled by default at
	// least.
	HistoryMetaTTL time.Duration
}

MemoryBrokerConfig is a memory broker config.

type MemoryPresenceManager added in v0.16.0

type MemoryPresenceManager struct {
	// contains filtered or unexported fields
}

MemoryPresenceManager is builtin default PresenceManager which allows to run Centrifuge-based server without any external storage. All data managed inside process memory.

With this PresenceManager you can only run single Centrifuge node. If you need to scale you should consider using another PresenceManager implementation instead – for example RedisPresenceManager.

Running single node can be sufficient for many use cases especially when you need maximum performance and not too many online clients. Consider configuring your load balancer to have one backup Centrifuge node for HA in this case.

func NewMemoryPresenceManager added in v0.16.0

func NewMemoryPresenceManager(n *Node, c MemoryPresenceManagerConfig) (*MemoryPresenceManager, error)

NewMemoryPresenceManager initializes MemoryPresenceManager.

func (*MemoryPresenceManager) AddPresence added in v0.16.0

func (m *MemoryPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error

AddPresence - see PresenceManager interface description.

func (*MemoryPresenceManager) Close added in v0.17.1

Close is noop for now.

func (*MemoryPresenceManager) Presence added in v0.16.0

func (m *MemoryPresenceManager) Presence(ch string) (map[string]*ClientInfo, error)

Presence - see PresenceManager interface description.

func (*MemoryPresenceManager) PresenceStats added in v0.16.0

func (m *MemoryPresenceManager) PresenceStats(ch string) (PresenceStats, error)

PresenceStats - see PresenceManager interface description.

func (*MemoryPresenceManager) RemovePresence added in v0.16.0

func (m *MemoryPresenceManager) RemovePresence(ch string, uid string) error

RemovePresence - see PresenceManager interface description.

type MemoryPresenceManagerConfig added in v0.16.0

type MemoryPresenceManagerConfig struct{}

MemoryPresenceManagerConfig is a MemoryPresenceManager config.

type MessageEvent

type MessageEvent struct {
	// Data contains message untouched payload.
	Data []byte
}

MessageEvent contains fields related to message request.

type MessageHandler

type MessageHandler func(MessageEvent)

MessageHandler must handle incoming async message from client.

type Metrics

type Metrics struct {
	Interval float64
	Items    map[string]float64
}

Metrics aggregation over time interval for node.

type Node

type Node struct {
	// contains filtered or unexported fields
}

Node is a heart of Centrifuge library – it keeps and manages client connections, maintains information about other Centrifuge nodes in cluster, keeps references to common things (like Broker and PresenceManager, Hub) etc. By default Node uses in-memory implementations of Broker and PresenceManager - MemoryBroker and MemoryPresenceManager which allow running a single Node only. To scale use other implementations of Broker and PresenceManager like builtin RedisBroker and RedisPresenceManager.

func New

func New(c Config) (*Node, error)

New creates Node with provided Config.

func (*Node) Disconnect

func (n *Node) Disconnect(userID string, opts ...DisconnectOption) error

Disconnect allows closing all user connections on all nodes.

func (*Node) History

func (n *Node) History(ch string, opts ...HistoryOption) (HistoryResult, error)

History allows to extract Publications in channel. The channel must belong to namespace where history is on.

func (*Node) Hub

func (n *Node) Hub() *Hub

Hub returns node's Hub.

func (*Node) ID added in v0.15.0

func (n *Node) ID() string

ID returns unique Node identifier. This is a UUID v4 value.

func (*Node) Info

func (n *Node) Info() (Info, error)

Info returns aggregated stats from all nodes.

func (*Node) Log

func (n *Node) Log(entry LogEntry)

Log allows to log entry.

func (*Node) LogEnabled

func (n *Node) LogEnabled(level LogLevel) bool

LogEnabled allows to log entry.

func (*Node) Notify added in v0.17.0

func (n *Node) Notify(op string, data []byte, toNodeID string) error

Notify allows sending an asynchronous notification to all other nodes (or to a single specific node). Unlike Survey it does not wait for any response. If toNodeID is not an empty string then a notification will be sent to a concrete node in cluster, otherwise a notification sent to all running nodes. See a corresponding Node.OnNotification method to handle received notifications.

func (*Node) NotifyShutdown

func (n *Node) NotifyShutdown() chan struct{}

NotifyShutdown returns a channel which will be closed on node shutdown.

func (*Node) OnConnect added in v0.10.0

func (n *Node) OnConnect(handler ConnectHandler)

OnConnect allows setting ConnectHandler. ConnectHandler called after client connection successfully established, authenticated and Connect Reply already sent to client. This is a place where application can start communicating with client.

func (*Node) OnConnecting added in v0.10.0

func (n *Node) OnConnecting(handler ConnectingHandler)

OnConnecting allows setting ConnectingHandler. ConnectingHandler will be called when client sends Connect command to server. In this handler server can reject connection or provide Credentials for it.

func (*Node) OnNodeInfoSend added in v0.18.0

func (n *Node) OnNodeInfoSend(handler NodeInfoSendHandler)

OnNodeInfoSend allows setting NodeInfoSendHandler. This should be done before Node.Run called.

func (*Node) OnNotification added in v0.17.0

func (n *Node) OnNotification(handler NotificationHandler)

OnNotification allows setting NotificationHandler. This should be done before Node.Run called.

func (*Node) OnSurvey added in v0.15.0

func (n *Node) OnSurvey(handler SurveyHandler)

OnSurvey allows setting SurveyHandler. This should be done before Node.Run called.

func (*Node) OnTransportWrite added in v0.18.0

func (n *Node) OnTransportWrite(handler TransportWriteHandler)

OnTransportWrite allows setting TransportWriteHandler. This should be done before Node.Run called.

func (*Node) Presence

func (n *Node) Presence(ch string) (PresenceResult, error)

Presence returns a map with information about active clients in channel.

func (*Node) PresenceStats

func (n *Node) PresenceStats(ch string) (PresenceStatsResult, error)

PresenceStats returns presence stats from PresenceManager.

func (*Node) Publish

func (n *Node) Publish(channel string, data []byte, opts ...PublishOption) (PublishResult, error)

Publish sends data to all clients subscribed on channel at this moment. All running nodes will receive Publication and send it to all local channel subscribers.

Data expected to be valid marshaled JSON or any binary payload. Connections that work over JSON protocol can not handle binary payloads. Connections that work over Protobuf protocol can work both with JSON and binary payloads.

So the rule here: if you have channel subscribers that work using JSON protocol then you can not publish binary data to these channel.

Channels in Centrifuge are ephemeral and its settings not persisted over different publish operations. So if you want to have channel with history stream behind you need to provide WithHistory option on every publish. To simplify working with different channels you can make some type of publish wrapper in your own code.

The returned PublishResult contains embedded StreamPosition that describes position inside stream Publication was added too. For channels without history enabled (i.e. when Publications only sent to PUB/SUB system) StreamPosition will be an empty struct (i.e. PublishResult.Offset will be zero).

func (*Node) Refresh added in v0.18.0

func (n *Node) Refresh(userID string, opts ...RefreshOption) error

Refresh user connection. Without any options will make user connections non-expiring. Note, that OnRefresh event won't be called in this case since this is a server-side refresh.

func (*Node) RemoveHistory

func (n *Node) RemoveHistory(ch string) error

RemoveHistory removes channel history.

func (*Node) Run

func (n *Node) Run() error

Run performs node startup actions. At moment must be called once on start after Broker set to Node.

func (*Node) SetBroker

func (n *Node) SetBroker(b Broker)

SetBroker allows to set Broker implementation to use.

func (*Node) SetPresenceManager

func (n *Node) SetPresenceManager(m PresenceManager)

SetPresenceManager allows to set PresenceManager to use.

func (*Node) Shutdown

func (n *Node) Shutdown(ctx context.Context) error

Shutdown sets shutdown flag to Node so handlers could stop accepting new requests and disconnects clients with shutdown reason.

func (*Node) Subscribe added in v0.16.0

func (n *Node) Subscribe(userID string, channel string, opts ...SubscribeOption) error

Subscribe subscribes user to a channel. Note, that OnSubscribe event won't be called in this case since this is a server-side subscription. If user have been already subscribed to a channel then its subscription will be updated and subscribe notification will be sent to a client-side.

func (*Node) Survey added in v0.15.0

func (n *Node) Survey(ctx context.Context, op string, data []byte) (map[string]SurveyResult, error)

Survey allows collecting data from all running Centrifuge nodes. This method publishes control messages, then waits for replies from all running nodes. The maximum time to wait can be controlled over context timeout. If provided context does not have a deadline for survey then this method uses default 10 seconds timeout. Keep in mind that Survey does not scale very well as number of Centrifuge Node grows. Though it has reasonably good performance to perform rare tasks even with relatively large number of nodes.

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(userID string, channel string, opts ...UnsubscribeOption) error

Unsubscribe unsubscribes user from a channel. If a channel is empty string then user will be unsubscribed from all channels.

type NodeInfo

type NodeInfo struct {
	UID         string
	Name        string
	Version     string
	NumClients  uint32
	NumUsers    uint32
	NumSubs     uint32
	NumChannels uint32
	Uptime      uint32
	Metrics     *Metrics
	Data        []byte
}

NodeInfo contains information about node.

type NodeInfoSendHandler added in v0.18.0

type NodeInfoSendHandler func() NodeInfoSendReply

NodeInfoSendHandler called every time the control node frame is published and allows modifying Node control frame sending. Currently attaching an arbitrary data to it. See NodeInfoSendReply.

type NodeInfoSendReply added in v0.18.0

type NodeInfoSendReply struct {
	// Data allows setting an arbitrary data to the control node frame which is
	// published by each Node periodically so it will be available in the
	// result of Node.Info call for the current Node description. Keep this
	// data reasonably small in size.
	Data []byte
}

NodeInfoSendReply can modify sending Node control frame in some ways.

type NotificationEvent added in v0.17.0

type NotificationEvent struct {
	FromNodeID string
	Op         string
	Data       []byte
}

NotificationEvent with Op and Data.

type NotificationHandler added in v0.17.0

type NotificationHandler func(NotificationEvent)

NotificationHandler allows handling notifications.

type PresenceCallback added in v0.13.0

type PresenceCallback func(PresenceReply, error)

PresenceCallback should be called with PresenceReply or error.

type PresenceEvent added in v0.9.0

type PresenceEvent struct {
	Channel string
}

PresenceEvent has channel operation called for.

type PresenceHandler added in v0.9.0

type PresenceHandler func(PresenceEvent, PresenceCallback)

PresenceHandler called when presence request received from client.

type PresenceManager

type PresenceManager interface {
	// Presence returns actual presence information for channel.
	Presence(ch string) (map[string]*ClientInfo, error)
	// PresenceStats returns short stats of current presence data
	// suitable for scenarios when caller does not need full client
	// info returned by presence method.
	PresenceStats(ch string) (PresenceStats, error)
	// AddPresence sets or updates presence information in channel
	// for connection with specified identifier. PresenceManager should
	// have a property to expire client information that was not updated
	// (touched) after some configured time interval.
	AddPresence(ch string, clientID string, info *ClientInfo) error
	// RemovePresence removes presence information for connection
	// with specified identifier.
	RemovePresence(ch string, clientID string) error
}

PresenceManager is responsible for channel presence management.

type PresenceReply added in v0.9.0

type PresenceReply struct {
	Result *PresenceResult
}

PresenceReply contains fields determining the reaction on presence request.

type PresenceResult added in v0.10.0

type PresenceResult struct {
	Presence map[string]*ClientInfo
}

PresenceResult wraps presence.

type PresenceStats

type PresenceStats struct {
	// NumClients is a number of client connections in channel.
	NumClients int
	// NumUsers is a number of unique users in channel.
	NumUsers int
}

PresenceStats represents a short presence information for channel.

type PresenceStatsCallback added in v0.13.0

type PresenceStatsCallback func(PresenceStatsReply, error)

PresenceStatsCallback should be called with PresenceStatsReply or error.

type PresenceStatsEvent added in v0.10.0

type PresenceStatsEvent struct {
	Channel string
}

PresenceStatsEvent has channel operation called for.

type PresenceStatsHandler added in v0.10.0

type PresenceStatsHandler func(PresenceStatsEvent, PresenceStatsCallback)

PresenceStatsHandler must handle incoming command from client.

type PresenceStatsReply added in v0.10.0

type PresenceStatsReply struct {
	Result *PresenceStatsResult
}

PresenceStatsReply contains fields determining the reaction on presence request.

type PresenceStatsResult added in v0.10.0

type PresenceStatsResult struct {
	PresenceStats
}

PresenceStatsResult wraps presence stats.

type ProtocolType added in v0.1.0

type ProtocolType string

ProtocolType represents client connection transport encoding format.

const (
	// ProtocolTypeJSON means JSON-based protocol.
	ProtocolTypeJSON ProtocolType = "json"
	// ProtocolTypeProtobuf means protobuf protocol.
	ProtocolTypeProtobuf ProtocolType = "protobuf"
)

type Publication

type Publication struct {
	// Offset is an incremental position number inside a history stream.
	// Zero value means that channel does not maintain Publication stream.
	Offset uint64
	// Data published to a channel.
	Data []byte
	// Info is an optional information about client connection published this data.
	Info *ClientInfo
}

Publication is a data sent to a channel.

type PublishCallback added in v0.13.0

type PublishCallback func(PublishReply, error)

PublishCallback should be called with PublishReply or error.

type PublishEvent

type PublishEvent struct {
	// Channel client wants to publish data to.
	Channel string
	// Data client wants to publish.
	Data []byte
	// ClientInfo about client connection.
	ClientInfo *ClientInfo
}

PublishEvent contains fields related to publish event. Note that this event called before actual publish to Broker so handler has an option to reject this publication returning an error.

type PublishHandler

type PublishHandler func(PublishEvent, PublishCallback)

PublishHandler called when client publishes into channel.

type PublishOption

type PublishOption func(*PublishOptions)

PublishOption is a type to represent various Publish options.

func WithClientInfo added in v0.13.0

func WithClientInfo(info *ClientInfo) PublishOption

WithClientInfo adds ClientInfo to Publication.

func WithHistory added in v0.12.0

func WithHistory(size int, ttl time.Duration) PublishOption

WithHistory tells Broker to save message to history stream with provided size and ttl.

type PublishOptions

type PublishOptions struct {
	// HistoryTTL sets history ttl to expire inactive history streams.
	// Current Broker implementations only work with seconds resolution for TTL.
	HistoryTTL time.Duration
	// HistorySize sets history size limit to prevent infinite stream growth.
	HistorySize int
	// ClientInfo to include into Publication. By default no ClientInfo will be appended.
	ClientInfo *ClientInfo
}

PublishOptions define some fields to alter behaviour of Publish operation.

type PublishReply

type PublishReply struct {
	// Options to control publication.
	Options PublishOptions

	// Result if set will tell Centrifuge that message already published to
	// channel by handler code. In this case Centrifuge won't try to publish
	// into channel again after handler returned PublishReply. This can be
	// useful if you need to know new Publication offset in your code or you
	// want to make sure message successfully published to Broker on server
	// side (otherwise only client will get an error).
	Result *PublishResult
}

PublishReply contains fields determining the result on publish.

type PublishResult added in v0.8.0

type PublishResult struct {
	StreamPosition
}

PublishResult returned from Publish operation.

type RPCCallback added in v0.13.0

type RPCCallback func(RPCReply, error)

RPCCallback should be called as soon as handler decides what to do with connection RPCEvent.

type RPCEvent

type RPCEvent struct {
	// Method is an optional string that contains RPC method name client wants to call.
	// This is an optional field, by default clients send RPC without any method set.
	Method string
	// Data contains RPC untouched payload.
	Data []byte
}

RPCEvent contains fields related to rpc request.

type RPCHandler

type RPCHandler func(RPCEvent, RPCCallback)

RPCHandler must handle incoming command from client.

type RPCReply

type RPCReply struct {
	// Data to return in RPC reply to client.
	Data []byte
}

RPCReply contains fields determining the reaction on rpc request.

type RedisBroker added in v0.16.0

type RedisBroker struct {
	// contains filtered or unexported fields
}

RedisBroker uses Redis to implement Broker functionality. This broker allows scaling Centrifuge-based server to many instances and load balance client connections between them. RedisBroker additionally supports Redis Sentinel, client-side consistent sharding and can work with Redis Cluster (including client-side sharding between different Redis Clusters to scale PUB/SUB). By default Redis >= 5 required (due to the fact RedisBroker uses STREAM data structure).

func NewRedisBroker added in v0.16.0

func NewRedisBroker(n *Node, config RedisBrokerConfig) (*RedisBroker, error)

NewRedisBroker initializes Redis Broker.

func (*RedisBroker) History added in v0.16.0

func (b *RedisBroker) History(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error)

History - see Broker.History.

func (*RedisBroker) Publish added in v0.16.0

func (b *RedisBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error)

Publish - see Broker.Publish.

func (*RedisBroker) PublishControl added in v0.16.0

func (b *RedisBroker) PublishControl(data []byte, nodeID, _ string) error

PublishControl - see Broker.PublishControl.

func (*RedisBroker) PublishJoin added in v0.16.0

func (b *RedisBroker) PublishJoin(ch string, info *ClientInfo) error

PublishJoin - see Broker.PublishJoin.

func (*RedisBroker) PublishLeave added in v0.16.0

func (b *RedisBroker) PublishLeave(ch string, info *ClientInfo) error

PublishLeave - see Broker.PublishLeave.

func (*RedisBroker) RemoveHistory added in v0.16.0

func (b *RedisBroker) RemoveHistory(ch string) error

RemoveHistory - see Broker.RemoveHistory.

func (*RedisBroker) Run added in v0.16.0

Run – see Broker.Run.

func (*RedisBroker) Subscribe added in v0.16.0

func (b *RedisBroker) Subscribe(ch string) error

Subscribe - see Broker.Subscribe.

func (*RedisBroker) Unsubscribe added in v0.16.0

func (b *RedisBroker) Unsubscribe(ch string) error

Unsubscribe - see Broker.Unsubscribe.

type RedisBrokerConfig added in v0.16.0

type RedisBrokerConfig struct {
	// Prefix to use before every channel name and key in Redis. By default
	// DefaultRedisBrokerPrefix will be used.
	Prefix string

	// HistoryMetaTTL sets a time of stream meta key expiration in Redis. Stream
	// meta key is a Redis HASH that contains top offset in channel and epoch value.
	// By default stream meta keys do not expire.
	//
	// Though in some cases – when channels created for а short time and then
	// not used anymore – created stream meta keys can stay in memory while
	// not actually useful. For example you can have a personal user channel but
	// after using your app for a while user left it forever. In long-term
	// perspective this can be an unwanted memory leak. Setting a reasonable
	// value to this option (usually much bigger than history retention period)
	// can help. In this case unused channel stream meta data will eventually expire.
	//
	// TODO v1: since we have epoch, things should also properly work without meta
	// information at all (but we loose possibility of long-term recover in stream
	// without new messages). We can make this optional and disabled by default at
	// least.
	HistoryMetaTTL time.Duration

	// UseLists allows enabling usage of Redis LIST instead of STREAM data
	// structure to keep history. LIST support exist mostly for backward
	// compatibility since STREAM seems superior. If you have a use case
	// where you need to turn on this option in new setup - please share,
	// otherwise LIST support can be removed at some point in the future.
	// Iteration over history in reversed order not supported with lists.
	UseLists bool

	// PubSubNumWorkers sets how many PUB/SUB message processing workers will
	// be started. By default runtime.NumCPU() workers used.
	PubSubNumWorkers int

	// Shards is a list of Redis shards to use. At least one shard must be provided.
	Shards []*RedisShard
}

type RedisPresenceManager added in v0.16.0

type RedisPresenceManager struct {
	// contains filtered or unexported fields
}

RedisPresenceManager keeps presence in Redis thus allows scaling nodes.

func NewRedisPresenceManager added in v0.16.0

func NewRedisPresenceManager(n *Node, config RedisPresenceManagerConfig) (*RedisPresenceManager, error)

NewRedisPresenceManager creates new RedisPresenceManager.

func (*RedisPresenceManager) AddPresence added in v0.16.0

func (m *RedisPresenceManager) AddPresence(ch string, uid string, info *ClientInfo) error

AddPresence - see PresenceManager interface description.

func (*RedisPresenceManager) Presence added in v0.16.0

func (m *RedisPresenceManager) Presence(ch string) (map[string]*ClientInfo, error)

Presence - see PresenceManager interface description.

func (*RedisPresenceManager) PresenceStats added in v0.16.0

func (m *RedisPresenceManager) PresenceStats(ch string) (PresenceStats, error)

PresenceStats - see PresenceManager interface description.

func (*RedisPresenceManager) RemovePresence added in v0.16.0

func (m *RedisPresenceManager) RemovePresence(ch string, uid string) error

RemovePresence - see PresenceManager interface description.

type RedisPresenceManagerConfig added in v0.16.0

type RedisPresenceManagerConfig struct {
	// Prefix to use before every channel name and key in Redis. By default
	// DefaultRedisPresenceManagerPrefix will be used.
	Prefix string

	// PresenceTTL is an interval how long to consider presence info
	// valid after receiving presence update. This allows to automatically
	// clean up unnecessary presence entries after TTL passed. Zero value
	// means that DefaultRedisPresenceTTL will be used.
	PresenceTTL time.Duration

	// Shards is a list of Redis shards to use. At least one shard must be provided.
	Shards []*RedisShard
}

RedisPresenceManagerConfig is a config for RedisPresenceManager.

type RedisShard added in v0.16.0

type RedisShard struct {
	// contains filtered or unexported fields
}

func NewRedisShard added in v0.16.0

func NewRedisShard(n *Node, conf RedisShardConfig) (*RedisShard, error)

NewRedisShard initializes new Redis shard.

type RedisShardConfig

type RedisShardConfig struct {
	// Address is a Redis server connection address.
	// This can be:
	// - host:port
	// - tcp://[[:password]@]host:port[/db][?option1=value1&optionN=valueN]
	// - redis://[[:password]@]host:port[/db][?option1=value1&optionN=valueN]
	// - unix://[[:password]@]path[?option1=value1&optionN=valueN]
	Address string

	// ClusterAddresses is a slice of seed cluster addrs for this shard.
	// Each address should be in form of host:port. If ClusterAddresses set then
	// RedisShardConfig.Address not used.
	ClusterAddresses []string

	// SentinelAddresses is a slice of Sentinel addresses. Each address should
	// be in form of host:port. If set then Redis address will be automatically
	// discovered from Sentinel.
	SentinelAddresses []string
	// SentinelMasterName is a name of Redis instance master Sentinel monitors.
	SentinelMasterName string
	// SentinelPassword is a password for Sentinel. Works with Sentinel >= 5.0.1.
	SentinelPassword string

	// DB is Redis database number. If not set then database 0 used.
	DB int
	// Password is password to use when connecting to Redis database.
	// If zero then password not used.
	Password string
	// Whether to use TLS connection or not.
	UseTLS bool
	// Whether to skip hostname verification as part of TLS handshake.
	TLSSkipVerify bool
	// Connection TLS configuration.
	TLSConfig *tls.Config
	// IdleTimeout is timeout after which idle connections to Redis will be closed.
	// If the value is zero, then idle connections are not closed.
	IdleTimeout time.Duration
	// ReadTimeout is a timeout on read operations. Note that at moment it should be greater
	// than node ping publish interval in order to prevent timing out Pub/Sub connection's
	// Receive call.
	// By default DefaultRedisReadTimeout used.
	ReadTimeout time.Duration
	// WriteTimeout is a timeout on write operations.
	// By default DefaultRedisWriteTimeout used.
	WriteTimeout time.Duration
	// ConnectTimeout is a timeout on connect operation.
	// By default DefaultRedisConnectTimeout used.
	ConnectTimeout time.Duration
	// contains filtered or unexported fields
}

RedisShardConfig contains Redis connection options.

type RefreshCallback added in v0.13.0

type RefreshCallback func(RefreshReply, error)

RefreshCallback should be called as soon as handler decides what to do with connection refresh event.

type RefreshEvent

type RefreshEvent struct {
	// ClientSideRefresh is true for refresh initiated by client-side refresh workflow.
	ClientSideRefresh bool
	// Token will only be set in case of using client-side refresh mechanism.
	Token string
}

RefreshEvent contains fields related to refresh event.

type RefreshHandler

type RefreshHandler func(RefreshEvent, RefreshCallback)

RefreshHandler called when it's time to validate client connection and update it's expiration time if it's still actual.

Centrifuge library supports two ways of refreshing connection: client-side and server-side.

The default mechanism is server-side, this means that as soon refresh handler set and connection expiration time happens (by timer) – refresh handler will be called.

If ClientSideRefresh in ConnectReply inside ConnectingHandler set to true then library uses client-side refresh mechanism. In this case library relies on Refresh commands sent from client periodically to refresh connection. Refresh command contains updated connection token.

type RefreshOption added in v0.18.0

type RefreshOption func(options *RefreshOptions)

RefreshOption is a type to represent various Refresh options.

func WithRefreshClient added in v0.18.0

func WithRefreshClient(clientID string) RefreshOption

WithRefreshClient to limit refresh only for specified client ID.

func WithRefreshExpireAt added in v0.18.0

func WithRefreshExpireAt(expireAt int64) RefreshOption

WithRefreshExpireAt to set unix seconds in the future when connection should expire. Zero value means no expiration.

func WithRefreshExpired added in v0.18.0

func WithRefreshExpired(expired bool) RefreshOption

WithRefreshExpired to set expired flag - connection will be closed with DisconnectExpired.

func WithRefreshInfo added in v0.18.0

func WithRefreshInfo(info []byte) RefreshOption

WithRefreshInfo to override connection info.

type RefreshOptions added in v0.18.0

type RefreshOptions struct {
	// Expired can close connection with expired reason.
	Expired bool
	// ExpireAt defines time in future when subscription should expire,
	// zero value means no expiration.
	ExpireAt int64
	// Info defines custom channel information, zero value means no channel information.
	Info []byte
	// contains filtered or unexported fields
}

RefreshOptions ...

type RefreshReply

type RefreshReply struct {
	// Expired tells Centrifuge that connection expired. In this case connection will be
	// closed with DisconnectExpired.
	Expired bool
	// ExpireAt defines time in future when connection should expire,
	// zero value means no expiration.
	ExpireAt int64
	// Info allows to modify connection information,
	// zero value means no modification of current connection Info.
	Info []byte
}

RefreshReply contains fields determining the reaction on refresh event.

type SockjsConfig

type SockjsConfig struct {
	// HandlerPrefix sets prefix for SockJS handler endpoint path.
	HandlerPrefix string

	// URL is an address to SockJS client javascript library.
	URL string

	// HeartbeatDelay sets how often to send heartbeat frames to clients.
	HeartbeatDelay time.Duration

	// CheckOrigin allows to decide whether to use CORS or not in XHR case.
	// When false returned then CORS headers won't be set.
	CheckOrigin func(*http.Request) bool

	// WebsocketCheckOrigin allows to set custom CheckOrigin func for underlying
	// Gorilla Websocket based websocket.Upgrader.
	WebsocketCheckOrigin func(*http.Request) bool

	// WebsocketReadBufferSize is a parameter that is used for raw websocket websocket.Upgrader.
	// If set to zero reasonable default value will be used.
	WebsocketReadBufferSize int

	// WebsocketWriteBufferSize is a parameter that is used for raw websocket websocket.Upgrader.
	// If set to zero reasonable default value will be used.
	WebsocketWriteBufferSize int

	// WebsocketUseWriteBufferPool enables using buffer pool for writes in Websocket transport.
	WebsocketUseWriteBufferPool bool

	// WebsocketWriteTimeout is maximum time of write message operation.
	// Slow client will be disconnected.
	// By default DefaultWebsocketWriteTimeout will be used.
	WebsocketWriteTimeout time.Duration
}

SockjsConfig represents config for SockJS handler.

type SockjsHandler

type SockjsHandler struct {
	// contains filtered or unexported fields
}

SockjsHandler accepts SockJS connections. SockJS has a bunch of fallback transports when WebSocket connection is not supported. It comes with additional costs though: small protocol framing overhead, lack of binary support, more goroutines per connection, and you need to use sticky session mechanism on your load balancer in case you are using HTTP-based SockJS fallbacks and have more than one Centrifuge Node on a backend (so SockJS to be able to emulate bidirectional protocol). So if you can afford it - use WebsocketHandler only.

func NewSockjsHandler

func NewSockjsHandler(n *Node, c SockjsConfig) *SockjsHandler

NewSockjsHandler creates new SockjsHandler.

func (*SockjsHandler) ServeHTTP

func (s *SockjsHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)

type StreamPosition added in v0.8.0

type StreamPosition struct {
	// Offset defines publication incremental offset inside a stream.
	Offset uint64
	// Epoch allows handling situations when storage
	// lost stream entirely for some reason (expired or lost after restart) and we
	// want to track this fact to prevent successful recovery from another stream.
	// I.e. for example we have stream [1, 2, 3], then it's lost and new stream
	// contains [1, 2, 3, 4], client that recovers from position 3 will only receive
	// publication 4 missing 1, 2, 3 from new stream. With epoch we can tell client
	// that correct recovery is not possible.
	Epoch string
}

StreamPosition contains fields to describe position in stream. At moment this is used for automatic recovery mechanics. More info about stream recovery in docs: https://centrifugal.github.io/centrifugo/server/recover/.

type SubRefreshCallback added in v0.13.0

type SubRefreshCallback func(SubRefreshReply, error)

SubRefreshCallback should be called as soon as handler decides what to do with connection SubRefreshEvent.

type SubRefreshEvent

type SubRefreshEvent struct {
	// ClientSideRefresh is true for refresh initiated by client-side subscription
	// refresh workflow.
	ClientSideRefresh bool
	// Channel to which SubRefreshEvent belongs to.
	Channel string
	// Token will only be set in case of using client-side subscription refresh mechanism.
	Token string
}

SubRefreshEvent contains fields related to subscription refresh event.

type SubRefreshHandler

type SubRefreshHandler func(SubRefreshEvent, SubRefreshCallback)

SubRefreshHandler called when it's time to validate client subscription to channel and update it's state if needed.

If ClientSideRefresh in SubscribeReply inside SubscribeHandler set to true then library uses client-side subscription refresh mechanism. In this case library relies on SubRefresh commands sent from client periodically to refresh subscription. SubRefresh command contains updated subscription token.

type SubRefreshReply

type SubRefreshReply struct {
	// Expired tells Centrifuge that subscription expired. In this case connection will be
	// closed with DisconnectExpired.
	Expired bool
	// ExpireAt is a new Unix time of expiration. Zero value means no expiration.
	ExpireAt int64
	// Info is a new channel-scope info. Zero value means do not change previous one.
	Info []byte
}

SubRefreshReply contains fields determining the reaction on subscription refresh event.

type SubscribeCallback added in v0.13.0

type SubscribeCallback func(SubscribeReply, error)

SubscribeCallback should be called as soon as handler decides what to do with connection subscribe event.

type SubscribeEvent

type SubscribeEvent struct {
	// Channel client wants to subscribe to.
	Channel string
	// Token will only be set for token channels. This is a task of application
	// to check that subscription to a channel has valid token.
	Token string
}

SubscribeEvent contains fields related to subscribe event.

type SubscribeHandler

type SubscribeHandler func(SubscribeEvent, SubscribeCallback)

SubscribeHandler called when client wants to subscribe on channel.

type SubscribeOption added in v0.16.0

type SubscribeOption func(*SubscribeOptions)

SubscribeOption is a type to represent various Subscribe options.

func WithChannelInfo added in v0.16.0

func WithChannelInfo(chanInfo []byte) SubscribeOption

WithChannelInfo ...

func WithExpireAt added in v0.16.0

func WithExpireAt(expireAt int64) SubscribeOption

WithExpireAt allows to set ExpireAt field.

func WithJoinLeave added in v0.16.0

func WithJoinLeave(enabled bool) SubscribeOption

WithJoinLeave ...

func WithPosition added in v0.16.0

func WithPosition(enabled bool) SubscribeOption

WithPosition ...

func WithPresence added in v0.16.0

func WithPresence(enabled bool) SubscribeOption

WithPresence ...

func WithRecover added in v0.16.0

func WithRecover(enabled bool) SubscribeOption

WithRecover ...

func WithRecoverSince added in v0.18.0

func WithRecoverSince(since *StreamPosition) SubscribeOption

WithRecoverSince allows setting SubscribeOptions.RecoverFrom.

func WithSubscribeClient added in v0.16.0

func WithSubscribeClient(clientID string) SubscribeOption

WithSubscribeClient allows setting client ID that should be subscribed. This option not used when Client.Subscribe called.

func WithSubscribeData added in v0.16.0

func WithSubscribeData(data []byte) SubscribeOption

WithSubscribeData allows setting custom data to send with subscribe push.

type SubscribeOptions added in v0.13.0

type SubscribeOptions struct {
	// ExpireAt defines time in future when subscription should expire,
	// zero value means no expiration.
	ExpireAt int64
	// ChannelInfo defines custom channel information, zero value means no channel information.
	ChannelInfo []byte
	// Presence turns on participating in channel presence.
	Presence bool
	// JoinLeave enables sending Join and Leave messages for this client in channel.
	JoinLeave bool
	// When position is on client will additionally sync its position inside
	// a stream to prevent message loss. Make sure you are enabling Position in channels
	// that maintain Publication history stream. When Position is on  Centrifuge will
	// include StreamPosition information to subscribe response - for a client to be able
	// to manually track its position inside a stream.
	Position bool
	// Recover turns on recovery option for a channel. In this case client will try to
	// recover missed messages automatically upon resubscribe to a channel after reconnect
	// to a server. This option also enables client position tracking inside a stream
	// (like Position option) to prevent occasional message loss. Make sure you are using
	// Recover in channels that maintain Publication history stream.
	Recover bool
	// Data to send to a client with Subscribe Push.
	Data []byte
	// RecoverSince will try to subscribe a client and recover from a certain StreamPosition.
	RecoverSince *StreamPosition
	// contains filtered or unexported fields
}

SubscribeOptions define per-subscription options.

type SubscribeReply

type SubscribeReply struct {
	// Options to control subscription.
	Options SubscribeOptions

	// ClientSideRefresh tells library to use client-side refresh logic: i.e. send
	// SubRefresh commands with new Subscription Token. If not set then server-side
	// SubRefresh handler will be used.
	ClientSideRefresh bool
}

SubscribeReply contains fields determining the reaction on subscribe event.

type SubscribeRequest added in v0.16.0

type SubscribeRequest struct {
	// Recover enables publication recovery for a channel.
	Recover bool
	// Epoch last seen by a client.
	Epoch string
	// Offset last seen by a client.
	Offset uint64
}

SubscribeRequest contains state of subscription to a channel.

type SurveyCallback added in v0.15.0

type SurveyCallback func(SurveyReply)

SurveyCallback should be called with SurveyReply as soon as survey completed.

type SurveyEvent added in v0.15.0

type SurveyEvent struct {
	Op   string
	Data []byte
}

SurveyEvent with Op and Data of survey.

type SurveyHandler added in v0.15.0

type SurveyHandler func(SurveyEvent, SurveyCallback)

SurveyHandler allows to set survey handler function.

type SurveyReply added in v0.15.0

type SurveyReply struct {
	Code uint32
	Data []byte
}

SurveyReply contains survey reply fields.

type SurveyResult added in v0.15.0

type SurveyResult struct {
	Code uint32
	Data []byte
}

SurveyResult from node.

type Transport

type Transport interface {
	TransportInfo
	// Write should write single push data into a connection. Every byte slice
	// here is a single Reply (or Push for unidirectional transport) encoded
	// according transport ProtocolType.
	Write([]byte) error
	// WriteMany should write data into a connection. Every byte slice here is a
	// single Reply (or Push for unidirectional transport) encoded according
	// transport ProtocolType. The reason why we have both Write and WriteMany
	// here is to have a path without any additional allocations for massive
	// broadcasts (since variadic args cause allocation).
	WriteMany(...[]byte) error
	// Close must close a transport. Transport implementation can optionally
	// handle Disconnect passed here. For example builtin WebSocket transport
	// sends Disconnect as part of websocket.CloseMessage.
	Close(*Disconnect) error
}

Transport abstracts a connection transport between server and client. It does not contain Read method as reading can be handled by connection handler code (for example by WebsocketHandler.ServeHTTP).

type TransportInfo

type TransportInfo interface {
	// Name returns a name of transport.
	Name() string
	// Protocol returns an underlying transport protocol type used.
	// JSON or Protobuf types are supported.
	Protocol() ProtocolType
	// Unidirectional returns whether transport is unidirectional. For
	// unidirectional transports Centrifuge uses Push protobuf messages
	// without additional wrapping into Reply protocol message.
	Unidirectional() bool
	// DisabledPushFlags returns a disabled push flags for specific transport.
	// For example this allows to disable sending Disconnect push in case of
	// bidirectional WebSocket implementation since disconnect data sent inside
	// Close frame.
	DisabledPushFlags() uint64
}

TransportInfo has read-only transport description methods.

type TransportWriteEvent added in v0.18.0

type TransportWriteEvent struct {
	Data   []byte
	IsPush bool
}

TransportWriteEvent with encoded Data and IsPush flag.

type TransportWriteHandler added in v0.18.0

type TransportWriteHandler func(*Client, TransportWriteEvent) bool

TransportWriteHandler called just before writing data to Transport. At this moment application can skip sending data to a client returning false from a handler. The main purpose of this handler is not a message filtering based on data content but rather tracing and throttling stuff.

type UnsubscribeEvent

type UnsubscribeEvent struct {
	// Channel client unsubscribed from.
	Channel string
	// ServerSide set to true for server-side subscription unsubscribe events.
	ServerSide bool
}

UnsubscribeEvent contains fields related to unsubscribe event.

type UnsubscribeHandler

type UnsubscribeHandler func(UnsubscribeEvent)

UnsubscribeHandler called when client unsubscribed from channel.

type UnsubscribeOption added in v0.5.0

type UnsubscribeOption func(options *UnsubscribeOptions)

UnsubscribeOption is a type to represent various Unsubscribe options.

func WithUnsubscribeClient added in v0.16.0

func WithUnsubscribeClient(clientID string) UnsubscribeOption

WithUnsubscribeClient allows setting client ID that should be unsubscribed. This option not used when Client.Unsubscribe called.

type UnsubscribeOptions added in v0.5.0

type UnsubscribeOptions struct {
	// contains filtered or unexported fields
}

UnsubscribeOptions ...

type WebsocketConfig

type WebsocketConfig struct {
	// CompressionLevel sets a level for websocket compression.
	// See possible value description at https://golang.org/pkg/compress/flate/#NewWriter
	CompressionLevel int

	// CompressionMinSize allows to set minimal limit in bytes for
	// message to use compression when writing it into client connection.
	// By default it's 0 - i.e. all messages will be compressed when
	// WebsocketCompression enabled and compression negotiated with client.
	CompressionMinSize int

	// ReadBufferSize is a parameter that is used for raw websocket Upgrader.
	// If set to zero reasonable default value will be used.
	ReadBufferSize int

	// WriteBufferSize is a parameter that is used for raw websocket Upgrader.
	// If set to zero reasonable default value will be used.
	WriteBufferSize int

	// MessageSizeLimit sets the maximum size in bytes of allowed message from client.
	// By default DefaultWebsocketMaxMessageSize will be used.
	MessageSizeLimit int

	// CheckOrigin func to provide custom origin check logic.
	// nil means allow all origins.
	CheckOrigin func(r *http.Request) bool

	// PingInterval sets interval server will send ping messages to clients.
	// By default DefaultPingInterval will be used.
	PingInterval time.Duration

	// WriteTimeout is maximum time of write message operation.
	// Slow client will be disconnected.
	// By default DefaultWebsocketWriteTimeout will be used.
	WriteTimeout time.Duration

	// Compression allows to enable websocket permessage-deflate
	// compression support for raw websocket connections. It does
	// not guarantee that compression will be used - i.e. it only
	// says that server will try to negotiate it with client.
	Compression bool

	// UseWriteBufferPool enables using buffer pool for writes.
	UseWriteBufferPool bool
}

WebsocketConfig represents config for WebsocketHandler.

type WebsocketHandler

type WebsocketHandler struct {
	// contains filtered or unexported fields
}

WebsocketHandler handles WebSocket client connections. WebSocket protocol is a bidirectional connection between a client an a server for low-latency communication.

func NewWebsocketHandler

func NewWebsocketHandler(n *Node, c WebsocketConfig) *WebsocketHandler

NewWebsocketHandler creates new WebsocketHandler.

func (*WebsocketHandler) ServeHTTP

func (s *WebsocketHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request)

Directories

Path Synopsis
_examples module
internal
priority
Package priority provides priority queue.
Package priority provides priority queue.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL