Documentation ¶
Index ¶
- Constants
- Variables
- type AppNode
- type AuthOption
- type AuthOptions
- type Config
- type Connection
- type Controller
- type DisconnectQueue
- type DisconnectQueueConfig
- type Disconnector
- type Executor
- type Node
- func (n *Node) Authenticate(s *Session, options ...AuthOption) (res *common.ConnectResult, err error)
- func (n *Node) Authenticated(s *Session, ids string)
- func (n *Node) Broadcast(msg *common.StreamMessage)
- func (n *Node) Disconnect(s *Session) error
- func (n *Node) DisconnectNow(s *Session) error
- func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)
- func (n *Node) HandleBroadcast(raw []byte)
- func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)
- func (n *Node) HandlePubSub(raw []byte)
- func (n *Node) History(s *Session, msg *common.Message) (err error)
- func (n *Node) Instrumenter() metrics.Instrumenter
- func (n *Node) IsShuttingDown() bool
- func (n *Node) LookupSession(id string) *Session
- func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error)
- func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
- func (n *Node) SetBroker(b broker.Broker)
- func (n *Node) SetDisconnector(d Disconnector)
- func (n *Node) Shutdown(ctx context.Context) (err error)
- func (n *Node) Size() int
- func (n *Node) Start() error
- func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)
- func (n *Node) TryRestoreSession(s *Session) (restored bool)
- func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)
- type NoopDisconnectQueue
- type Session
- func (s *Session) AuthenticateOnConnect() bool
- func (s *Session) Disconnect(reason string, code int)
- func (s *Session) DisconnectNow(reason string, code int)
- func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)
- func (s *Session) GetEnv() *common.SessionEnv
- func (s *Session) GetID() string
- func (s *Session) GetIdentifiers() string
- func (s *Session) IsClosed() bool
- func (s *Session) IsConnected() bool
- func (s *Session) IsDisconnectable() bool
- func (s *Session) IsResumeable() bool
- func (s *Session) MarkDisconnectable(val bool)
- func (s *Session) MergeEnv(env *common.SessionEnv)
- func (s *Session) PrevSid() string
- func (s *Session) ReadInternalState(key string) (interface{}, bool)
- func (s *Session) ReadMessage(message []byte) error
- func (s *Session) RestoreFromCache(cached []byte) error
- func (s *Session) Send(msg encoders.EncodedMessage)
- func (s *Session) SendJSONTransmission(msg string)
- func (s *Session) SendMessages()
- func (s *Session) Serve(callback func()) error
- func (s *Session) SetEnv(env *common.SessionEnv)
- func (s *Session) SetID(id string)
- func (s *Session) SetIdentifiers(ids string)
- func (s *Session) String() string
- func (s *Session) ToCacheEntry() ([]byte, error)
- func (s *Session) UnderlyingConn() Connection
- func (s *Session) WriteInternalState(key string, val interface{})
- type SessionOption
- func WithEncoder(enc encoders.Encoder) SessionOption
- func WithExecutor(ex Executor) SessionOption
- func WithHandshakeMessageDeadline(deadline time.Time) SessionOption
- func WithMetrics(m metrics.Instrumenter) SessionOption
- func WithPingInterval(interval time.Duration) SessionOption
- func WithPingPrecision(val string) SessionOption
- func WithPongTimeout(timeout time.Duration) SessionOption
- func WithPrevSID(sid string) SessionOption
- func WithResumable(val bool) SessionOption
- type SubscriptionState
- func (st *SubscriptionState) AddChannel(id string)
- func (st *SubscriptionState) AddChannelStream(id string, stream string)
- func (st *SubscriptionState) Channels() []string
- func (st *SubscriptionState) HasChannel(id string) bool
- func (st *SubscriptionState) RemoveChannel(id string)
- func (st *SubscriptionState) RemoveChannelStream(id string, stream string)
- func (st *SubscriptionState) RemoveChannelStreams(id string) []string
- func (st *SubscriptionState) StreamsFor(id string) []string
- func (st *SubscriptionState) ToMap() map[string][]string
Constants ¶
const ( DISCONNECT_MODE_ALWAYS = "always" DISCONNECT_MODE_AUTO = "auto" DISCONNECT_MODE_NEVER = "never" )
Variables ¶
var DISCONNECT_MODES = []string{DISCONNECT_MODE_ALWAYS, DISCONNECT_MODE_AUTO, DISCONNECT_MODE_NEVER}
Functions ¶
This section is empty.
Types ¶
type AppNode ¶ added in v1.0.1
type AppNode interface { HandlePubSub(msg []byte) LookupSession(id string) *Session Authenticate(s *Session, opts ...AuthOption) (*common.ConnectResult, error) Authenticated(s *Session, identifiers string) Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error) Perform(s *Session, msg *common.Message) (*common.CommandResult, error) Disconnect(s *Session) error }
AppNode describes a basic node interface
type AuthOption ¶ added in v1.4.0
type AuthOption = func(*AuthOptions)
func WithDisconnectOnFailure ¶ added in v1.4.0
func WithDisconnectOnFailure(disconnect bool) AuthOption
type AuthOptions ¶ added in v1.4.0
type AuthOptions struct {
DisconnectOnFailure bool
}
type Config ¶ added in v1.0.5
type Config struct { // Define when to invoke Disconnect callback DisconnectMode string // The number of goroutines to use for disconnect calls on shutdown ShutdownDisconnectPoolSize int // How often server should send Action Cable ping messages (seconds) PingInterval int // How ofter to refresh node stats (seconds) StatsRefreshInterval int // The max size of the Go routines pool for hub HubGopoolSize int // How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds) PingTimestampPrecision string // For how long to wait for pong message before disconnecting (seconds) PongTimeout int // For how long to wait for disconnect callbacks to be processed before exiting (seconds) ShutdownTimeout int }
Config contains general application/node settings
type Connection ¶ added in v1.1.0
type Connection interface { Write(msg []byte, deadline time.Time) error WriteBinary(msg []byte, deadline time.Time) error Read() ([]byte, error) Close(code int, reason string) }
Connection represents underlying connection
type Controller ¶
type Controller interface { Start() error Shutdown() error Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error }
Controller is an interface describing business-logic handler (e.g. RPC)
type DisconnectQueue ¶
type DisconnectQueue struct {
// contains filtered or unexported fields
}
DisconnectQueue is a rate-limited executor
func NewDisconnectQueue ¶
func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue
NewDisconnectQueue builds new queue with a specified rate (max calls per second)
func (*DisconnectQueue) Enqueue ¶
func (d *DisconnectQueue) Enqueue(s *Session) error
Enqueue adds session to the disconnect queue
func (*DisconnectQueue) Shutdown ¶
func (d *DisconnectQueue) Shutdown(ctx context.Context) error
Shutdown stops throttling and makes requests one by one
func (*DisconnectQueue) Size ¶
func (d *DisconnectQueue) Size() int
Size returns the number of enqueued tasks
type DisconnectQueueConfig ¶ added in v1.0.1
type DisconnectQueueConfig struct { // Limit the number of Disconnect RPC calls per second Rate int // The size of the channel's buffer for disconnect requests Backlog int // How much time wait to call all enqueued calls at exit (in seconds) [DEPREACTED] ShutdownTimeout int }
DisconnectQueueConfig contains DisconnectQueue configuration
func NewDisconnectQueueConfig ¶ added in v1.0.1
func NewDisconnectQueueConfig() DisconnectQueueConfig
NewDisconnectQueueConfig builds a new config
type Disconnector ¶ added in v1.0.1
type Disconnector interface { Run() error Shutdown(ctx context.Context) error Enqueue(*Session) error Size() int }
Disconnector is an interface for disconnect queue implementation
type Executor ¶ added in v1.1.0
type Executor interface { HandleCommand(*Session, *common.Message) error Disconnect(*Session) error }
Executor handles incoming commands (messages)
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node represents the whole application
func NewNode ¶
func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node
NewNode builds new node struct
func (*Node) Authenticate ¶
func (n *Node) Authenticate(s *Session, options ...AuthOption) (res *common.ConnectResult, err error)
Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.
func (*Node) Authenticated ¶ added in v1.4.0
Mark session as authenticated and register it with a hub. Useful when you perform authentication manually, not using a controller.
func (*Node) Broadcast ¶
func (n *Node) Broadcast(msg *common.StreamMessage)
Broadcast message to stream (locally)
func (*Node) Disconnect ¶
Disconnect adds session to disconnector queue and unregister session from hub
func (*Node) DisconnectNow ¶
DisconnectNow execute disconnect on controller
func (*Node) ExecuteRemoteCommand ¶ added in v1.4.0
func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage)
Execute remote command (locally)
func (*Node) HandleBroadcast ¶ added in v1.4.0
HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes
func (*Node) HandleCommand ¶
HandleCommand parses incoming message from client and execute the command (if recognized)
func (*Node) HandlePubSub ¶ added in v1.0.1
HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)
func (*Node) History ¶ added in v1.4.0
History fetches the stream history for the specified identifier
func (*Node) Instrumenter ¶ added in v1.4.0
func (n *Node) Instrumenter() metrics.Instrumenter
Return current instrumenter for the node
func (*Node) IsShuttingDown ¶ added in v1.4.2
func (*Node) LookupSession ¶ added in v1.1.4
func (*Node) RemoteDisconnect ¶ added in v1.0.1
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
RemoteDisconnect find a session by identifier and closes it
func (*Node) SetDisconnector ¶ added in v1.0.1
func (n *Node) SetDisconnector(d Disconnector)
SetDisconnector set disconnector for the node
func (*Node) TryRestoreSession ¶ added in v1.4.0
func (*Node) Unsubscribe ¶
Unsubscribe unsubscribes session from a channel
type NoopDisconnectQueue ¶ added in v1.0.1
type NoopDisconnectQueue struct{}
NoopDisconnectQueue is non-operational disconnect queue implementation
func NewNoopDisconnector ¶ added in v1.0.1
func NewNoopDisconnector() *NoopDisconnectQueue
NewNoopDisconnector returns new NoopDisconnectQueue
func (*NoopDisconnectQueue) Enqueue ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Enqueue(s *Session) error
Enqueue does nothing
func (*NoopDisconnectQueue) Run ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Run() error
Run does nothing
func (*NoopDisconnectQueue) Shutdown ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Shutdown(ctx context.Context) error
Shutdown does nothing
func (*NoopDisconnectQueue) Size ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Size() int
Size returns 0
type Session ¶
type Session struct { Connected bool // Could be used to store arbitrary data within a session InternalState map[string]interface{} Log *log.Entry // contains filtered or unexported fields }
Session represents active client
func NewSession ¶
func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string, opts ...SessionOption) *Session
NewSession build a new Session struct from ws connetion and http request
func (*Session) AuthenticateOnConnect ¶ added in v1.4.3
func (*Session) Disconnect ¶
Disconnect schedules connection disconnect
func (*Session) DisconnectNow ¶ added in v1.4.4
func (*Session) DisconnectWithMessage ¶ added in v1.2.3
func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string)
func (*Session) GetEnv ¶ added in v1.1.2
func (s *Session) GetEnv() *common.SessionEnv
func (*Session) GetIdentifiers ¶ added in v1.2.3
func (*Session) IsConnected ¶ added in v1.4.0
func (*Session) IsDisconnectable ¶ added in v1.4.0
func (*Session) IsResumeable ¶ added in v1.4.3
func (*Session) MarkDisconnectable ¶ added in v1.4.0
func (*Session) MergeEnv ¶ added in v1.1.4
func (s *Session) MergeEnv(env *common.SessionEnv)
Merge connection and channel states into current env. This method locks the state for writing (so, goroutine-safe)
func (*Session) ReadInternalState ¶ added in v1.4.0
ReadInternalState reads internal state value by key
func (*Session) ReadMessage ¶ added in v1.1.0
ReadMessage reads messages from ws connection and send them to node
func (*Session) RestoreFromCache ¶ added in v1.4.0
func (*Session) Send ¶
func (s *Session) Send(msg encoders.EncodedMessage)
Send schedules a data transmission
func (*Session) SendJSONTransmission ¶ added in v1.1.0
SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)
func (*Session) SendMessages ¶
func (s *Session) SendMessages()
SendMessages waits for incoming messages and send them to the client connection
func (*Session) SetEnv ¶ added in v1.1.2
func (s *Session) SetEnv(env *common.SessionEnv)
func (*Session) SetIdentifiers ¶ added in v1.2.3
func (*Session) String ¶ added in v1.4.5
String returns session string representation (for %v in Printf-like functions)
func (*Session) ToCacheEntry ¶ added in v1.4.0
func (*Session) UnderlyingConn ¶ added in v1.4.0
func (s *Session) UnderlyingConn() Connection
func (*Session) WriteInternalState ¶ added in v1.4.0
WriteInternalState
type SessionOption ¶ added in v1.4.0
type SessionOption = func(*Session)
func WithEncoder ¶ added in v1.4.3
func WithEncoder(enc encoders.Encoder) SessionOption
WithEncoder allows to set a custom encoder for a session
func WithExecutor ¶ added in v1.4.3
func WithExecutor(ex Executor) SessionOption
WithExecutor allows to set a custom executor for a session
func WithHandshakeMessageDeadline ¶ added in v1.4.3
func WithHandshakeMessageDeadline(deadline time.Time) SessionOption
WithHandshakeMessageDeadline allows to set a custom deadline for handshake messages. This option also indicates that we MUST NOT perform Authenticate on connect.
func WithMetrics ¶ added in v1.4.3
func WithMetrics(m metrics.Instrumenter) SessionOption
WithMetrics allows to set a custom metrics instrumenter for a session
func WithPingInterval ¶ added in v1.4.0
func WithPingInterval(interval time.Duration) SessionOption
WithPingInterval allows to set a custom ping interval for a session or disable pings at all (by passing 0)
func WithPingPrecision ¶ added in v1.4.3
func WithPingPrecision(val string) SessionOption
WithPingPrecision allows to configure precision for timestamps attached to pings
func WithPongTimeout ¶ added in v1.4.3
func WithPongTimeout(timeout time.Duration) SessionOption
WithPongTimeout allows to set a custom pong timeout for a session
func WithPrevSID ¶ added in v1.4.3
func WithPrevSID(sid string) SessionOption
WithPrevSID allows providing the previous session ID to restore from
func WithResumable ¶ added in v1.4.3
func WithResumable(val bool) SessionOption
WithResumable allows marking session as resumable (so we store its state in cache)
type SubscriptionState ¶ added in v1.2.3
type SubscriptionState struct {
// contains filtered or unexported fields
}
func NewSubscriptionState ¶ added in v1.2.3
func NewSubscriptionState() *SubscriptionState
func (*SubscriptionState) AddChannel ¶ added in v1.2.3
func (st *SubscriptionState) AddChannel(id string)
func (*SubscriptionState) AddChannelStream ¶ added in v1.2.3
func (st *SubscriptionState) AddChannelStream(id string, stream string)
func (*SubscriptionState) Channels ¶ added in v1.2.3
func (st *SubscriptionState) Channels() []string
func (*SubscriptionState) HasChannel ¶ added in v1.2.3
func (st *SubscriptionState) HasChannel(id string) bool
func (*SubscriptionState) RemoveChannel ¶ added in v1.2.3
func (st *SubscriptionState) RemoveChannel(id string)
func (*SubscriptionState) RemoveChannelStream ¶ added in v1.2.3
func (st *SubscriptionState) RemoveChannelStream(id string, stream string)
func (*SubscriptionState) RemoveChannelStreams ¶ added in v1.2.3
func (st *SubscriptionState) RemoveChannelStreams(id string) []string
func (*SubscriptionState) StreamsFor ¶ added in v1.2.3
func (st *SubscriptionState) StreamsFor(id string) []string
func (*SubscriptionState) ToMap ¶ added in v1.2.3
func (st *SubscriptionState) ToMap() map[string][]string