node

package
v1.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2017 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package node is a real-time core for Centrifugo server.

Index

Constants

View Source
const (
	// DefaultName of node.
	DefaultName = "centrifugo"
	// DefaultNodePingInterval used in default config.
	DefaultNodePingInterval = 3
)

Variables

View Source
var DefaultConfig = &Config{
	Name:                        DefaultName,
	Debug:                       false,
	AdminPassword:               "",
	AdminSecret:                 "",
	MaxChannelLength:            255,
	PingInterval:                25 * time.Second,
	NodePingInterval:            DefaultNodePingInterval * time.Second,
	NodeInfoCleanInterval:       DefaultNodePingInterval * 3 * time.Second,
	NodeInfoMaxDelay:            DefaultNodePingInterval*2*time.Second + 1*time.Second,
	NodeMetricsInterval:         60 * time.Second,
	PresencePingInterval:        25 * time.Second,
	PresenceExpireInterval:      60 * time.Second,
	ClientMessageWriteTimeout:   0,
	PrivateChannelPrefix:        "$",
	NamespaceChannelBoundary:    ":",
	ClientChannelBoundary:       "&",
	UserChannelBoundary:         "#",
	UserChannelSeparator:        ",",
	ExpiredConnectionCloseDelay: 25 * time.Second,
	StaleConnectionCloseDelay:   25 * time.Second,
	ClientRequestMaxSize:        65536,
	ClientQueueMaxSize:          10485760,
	ClientQueueInitialCapacity:  2,
	ClientChannelLimit:          128,
	Insecure:                    false,
}

DefaultConfig is Config initialized with default values for all fields.

Functions

This section is empty.

Types

type Config

type Config struct {
	// Debug turns on application debug mode.
	Debug bool `json:"debug"`

	// Name of this server node - must be unique, used as human readable
	// and meaningful node identificator.
	Name string `json:"name"`

	// Admin enables admin socket.
	Admin bool `json:"admin"`
	// AdminPassword is an admin password.
	AdminPassword string `json:"-"`
	// AdminSecret is a secret to generate auth token for admin socket connection.
	AdminSecret string `json:"-"`

	// Insecure turns on insecure mode - when it's turned on then no authentication
	// required at all when connecting to Centrifugo, anonymous access and publish
	// allowed for all channels, no connection check performed. This can be suitable
	// for demonstration or personal usage.
	Insecure bool `json:"insecure"`
	// InsecureAPI turns on insecure mode for HTTP API calls. This means that no
	// API sign required when sending commands. This can be useful if you don't want
	// to sign every request - for example if you closed API endpoint with firewall
	// or you want to play with API commands from command line using CURL.
	InsecureAPI bool `json:"insecure_api"`
	// InsecureAdmin turns on insecure mode for admin endpoints - no auth required to
	// connect to admin socket and web interface. Protect admin resources with firewall
	// rules in production when enabling this option.
	InsecureAdmin bool `json:"insecure_admin"`

	// MaxChannelLength is a maximum length of channel name.
	MaxChannelLength int `json:"max_channel_length"`

	// PingInterval sets interval server will send ping messages to clients.
	PingInterval time.Duration `json:"ping_interval"`

	// NodePingInterval is an interval how often node must send ping
	// control message.
	NodePingInterval time.Duration `json:"node_ping_interval"`
	// NodeInfoCleanInterval is an interval in seconds, how often node must clean
	// information about other running nodes.
	NodeInfoCleanInterval time.Duration `json:"node_info_clean_interval"`
	// NodeInfoMaxDelay is an interval in seconds – how many seconds node info
	// considered actual.
	NodeInfoMaxDelay time.Duration `json:"node_info_max_delay"`
	// NodeMetricsInterval detects interval node will use to aggregate metrics.
	NodeMetricsInterval time.Duration `json:"node_metrics_interval"`

	// PresencePingInterval is an interval how often connected clients
	// must update presence info.
	PresencePingInterval time.Duration `json:"presence_ping_interval"`
	// PresenceExpireInterval is an interval how long to consider
	// presence info valid after receiving presence ping.
	PresenceExpireInterval time.Duration `json:"presence_expire_interval"`

	// ExpiredConnectionCloseDelay is an interval given to client to
	// refresh its connection in the end of connection lifetime.
	ExpiredConnectionCloseDelay time.Duration `json:"expired_connection_close_delay"`

	// StaleConnectionCloseDelay is an interval in seconds after which
	// connection will be closed if still not authenticated.
	StaleConnectionCloseDelay time.Duration `json:"stale_connection_close_delay"`

	// MessageWriteTimeout is maximum time of write message operation.
	// Slow client will be disconnected. By default we don't use this option (i.e. it's 0)
	// and slow client connections will be closed when there queue size exceeds
	// ClientQueueMaxSize. In case of SockJS transport we don't have control over it so
	// it only affects raw websocket.
	ClientMessageWriteTimeout time.Duration `json:"client_message_write_timeout"`

	// ClientRequestMaxSize sets maximum size in bytes of allowed client request.
	ClientRequestMaxSize int `json:"client_request_max_size"`
	// ClientQueueMaxSize is a maximum size of client's message queue in bytes.
	// After this queue size exceeded Centrifugo closes client's connection.
	ClientQueueMaxSize int `json:"client_queue_max_size"`
	// ClientQueueInitialCapacity sets initial amount of slots in client message
	// queue. When these slots are full client queue is automatically resized to
	// a bigger size. This option can reduce amount of allocations when message
	// rate is very high and client queue resizes frequently. Note that memory
	// consumption per client connection grows with this option.
	ClientQueueInitialCapacity int `json:"client_queue_initial_capacity"`

	// ClientChannelLimit sets upper limit of channels each client can subscribe to.
	ClientChannelLimit int `json:"client_channel_limit"`

	// UserConnectionLimit limits number of connections from user with the same ID.
	UserConnectionLimit int `json:"user_connection_limit"`

	// PrivateChannelPrefix is a prefix in channel name which indicates that
	// channel is private.
	PrivateChannelPrefix string `json:"private_channel_prefix"`
	// NamespaceChannelBoundary is a string separator which must be put after
	// namespace part in channel name.
	NamespaceChannelBoundary string `json:"namespace_channel_boundary"`
	// UserChannelBoundary is a string separator which must be set before allowed
	// users part in channel name.
	UserChannelBoundary string `json:"user_channel_boundary"`
	// UserChannelSeparator separates allowed users in user part of channel name.
	UserChannelSeparator string `json:"user_channel_separator"`
	// ClientChannelBoundary is a string separator which must be set before client
	// connection ID in channel name so only client with this ID can subscribe on
	// that channel.
	ClientChannelBoundary string `json:"client_channel_separator"`

	// Secret is a secret key, used to sign API requests and client connection tokens.
	Secret string `json:"secret"`

	// ConnLifetime determines time until connection expire, 0 means no connection expire at all.
	ConnLifetime int64 `json:"connection_lifetime"`

	// ChannelOptions embedded to config.
	proto.ChannelOptions `json:"channel_options"`

	// Namespaces - list of namespaces for custom channel options.
	Namespaces []Namespace `json:"namespaces"`
}

Config contains Application configuration options.

func NewConfig

func NewConfig(v config.Getter) *Config

NewConfig creates new node.Config using getter interface.

func (*Config) Validate

func (c *Config) Validate() error

Validate validates config and returns error if problems found

type Mediator

type Mediator interface {
	Connect(client string, user string) bool
	Subscribe(ch string, client string, user string) bool
	Unsubscribe(ch string, client string, user string)
	Disconnect(client string, user string)
	Message(ch string, data []byte, client string, info *proto.ClientInfo) bool
}

Mediator is an interface to work with Centrifugo events from Go code. Implemented Mediator must be set to Application via corresponding Application method SetMediator.

type Namespace

type Namespace struct {
	// Name is a unique namespace name.
	Name NamespaceKey `json:"name"`

	// ChannelOptions for namespace determine channel options for channels belonging to this namespace.
	proto.ChannelOptions `mapstructure:",squash"`
}

Namespace allows to create channels with different channel options within the Project

type NamespaceKey

type NamespaceKey string

NamespaceKey is a name of namespace unique for project.

type Node

type Node struct {
	// TODO: make private.
	sync.RWMutex
	// contains filtered or unexported fields
}

Node is a heart of Centrifugo – it internally manages client and admin hubs, maintains information about other Centrifugo nodes, keeps references to config, engine, metrics etc.

func New

func New(version string, c *Config) *Node

New creates Node, the only required argument is config.

func (*Node) AddClientConn

func (n *Node) AddClientConn(c conns.ClientConn) error

AddClientConn registers authenticated connection in clientConnectionHub this allows to make operations with user connection on demand.

func (*Node) AddClientSub

func (n *Node) AddClientSub(ch string, c conns.ClientConn) error

AddClientSub registers subscription of connection on channel in both engine and clientSubscriptionHub.

func (*Node) AddPresence

func (n *Node) AddPresence(ch string, uid string, info proto.ClientInfo) error

AddPresence proxies presence adding to engine.

func (*Node) AdminHub

func (n *Node) AdminHub() conns.AdminHub

AdminHub returns node's admin hub.

func (*Node) AdminMsg

func (n *Node) AdminMsg(msg *proto.AdminMessage) error

AdminMsg handlesadmin message broadcasting it to all admins connected to this node.

func (*Node) ChannelOpts

func (n *Node) ChannelOpts(ch string) (proto.ChannelOptions, error)

ChannelOpts returns channel options for channel using current application structure.

func (*Node) Channels

func (n *Node) Channels() ([]string, error)

Channels returns list of all engines clients subscribed on all Centrifugo nodes.

func (*Node) ClientAllowed

func (n *Node) ClientAllowed(ch string, client string) bool

ClientAllowed checks if client can subscribe on channel - as channel can contain special part in the end to indicate which client allowed to subscribe on it.

func (*Node) ClientHub

func (n *Node) ClientHub() conns.ClientHub

ClientHub returns node's client hub.

func (*Node) ClientMsg

func (n *Node) ClientMsg(msg *proto.Message) error

ClientMsg handles messages published by web application or client into channel. The goal of this method to deliver this message to all clients on this node subscribed on channel.

func (*Node) Config

func (n *Node) Config() Config

Config returns a copy of node Config.

func (*Node) ControlMsg

func (n *Node) ControlMsg(cmd *proto.ControlMessage) error

ControlMsg handles messages from control channel - control messages used for internal communication between nodes to share state or proto.

func (*Node) Disconnect

func (n *Node) Disconnect(user string, reconnect bool) error

Disconnect allows to close all user connections to Centrifugo.

func (*Node) Engine

func (n *Node) Engine() engine.Engine

Engine returns node's Engine.

func (*Node) History

func (n *Node) History(ch string) ([]proto.Message, error)

History returns a slice of last messages published into project channel.

func (*Node) JoinMsg

func (n *Node) JoinMsg(msg *proto.JoinMessage) error

JoinMsg handles JoinMessage.

func (*Node) LastMessageID

func (n *Node) LastMessageID(ch string) (string, error)

LastMessageID return last message id for channel.

func (*Node) LeaveMsg

func (n *Node) LeaveMsg(msg *proto.LeaveMessage) error

LeaveMsg handles leave message.

func (*Node) Mediator

func (n *Node) Mediator() Mediator

Config returns a copy of node Config.

func (*Node) Node

func (n *Node) Node() proto.NodeInfo

Node returns raw information only from current node.

func (*Node) NotifyShutdown

func (n *Node) NotifyShutdown() chan struct{}

NotifyShutdown returns a channel which will be closed on node shutdown.

func (*Node) Presence

func (n *Node) Presence(ch string) (map[string]proto.ClientInfo, error)

Presence returns a map of active clients in project channel.

func (*Node) PrivateChannel

func (n *Node) PrivateChannel(ch string) bool

PrivateChannel checks if channel private and therefore subscription request on it must be properly signed on web application backend.

func (*Node) Publish

func (n *Node) Publish(msg *proto.Message, opts *proto.ChannelOptions) <-chan error

Publish sends a message to all clients subscribed on channel. All running nodes will receive it and will send it to all clients on node subscribed on channel.

func (*Node) PublishAdmin

func (n *Node) PublishAdmin(msg *proto.AdminMessage) <-chan error

PublishAdmin publishes message to admins.

func (*Node) PublishJoin

func (n *Node) PublishJoin(msg *proto.JoinMessage, opts *proto.ChannelOptions) <-chan error

PublishJoin allows to publish join message into channel when someone subscribes on it or leave message when someone unsubscribes from channel.

func (*Node) PublishLeave

func (n *Node) PublishLeave(msg *proto.LeaveMessage, opts *proto.ChannelOptions) <-chan error

PublishLeave allows to publish join message into channel when someone subscribes on it or leave message when someone unsubscribes from channel.

func (*Node) Reload

func (n *Node) Reload(getter config.Getter) error

Reload node.

func (*Node) RemoveClientConn

func (n *Node) RemoveClientConn(c conns.ClientConn) error

RemoveClientConn removes client connection from connection registry.

func (*Node) RemoveClientSub

func (n *Node) RemoveClientSub(ch string, c conns.ClientConn) error

RemoveClientSub removes subscription of connection on channel from both engine and clientSubscriptionHub.

func (*Node) RemovePresence

func (n *Node) RemovePresence(ch string, uid string) error

RemovePresence proxies presence removing to engine.

func (*Node) Run

func (n *Node) Run(opts *RunOptions) error

Run performs all startup actions. At moment must be called once on start after engine and structure set.

func (*Node) SetConfig

func (n *Node) SetConfig(c *Config)

SetConfig binds config to application.

func (*Node) Shutdown

func (n *Node) Shutdown() error

Shutdown sets shutdown flag and does various clean ups.

func (*Node) Stats

func (n *Node) Stats() proto.ServerStats

Stats returns aggregated stats from all Centrifugo nodes.

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(user string, ch string) error

Unsubscribe unsubscribes user from channel, if channel is equal to empty string then user will be unsubscribed from all channels.

func (*Node) UserAllowed

func (n *Node) UserAllowed(ch string, user string) bool

UserAllowed checks if user can subscribe on channel - as channel can contain special part in the end to indicate which users allowed to subscribe on it.

func (*Node) Version

func (n *Node) Version() string

Version returns version of node.

type RunOptions

type RunOptions struct {
	Engine   engine.Engine
	Servers  map[string]server.Server
	Mediator Mediator
}

RunOptions struct represents options that must be provided to node Run method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL