Documentation ¶
Index ¶
- type AppNode
- type CachedEncodedMessage
- type Config
- type Connection
- type Controller
- type DisconnectQueue
- type DisconnectQueueConfig
- type Disconnector
- type EncodingCache
- type EncodingFunction
- type Executor
- type Hub
- func (h *Hub) AddSession(s *Session)
- func (h *Hub) AddSubscription(sid string, identifier string, stream string)
- func (h *Hub) Broadcast(stream string, data string)
- func (h *Hub) BroadcastMessage(msg *common.StreamMessage)
- func (h *Hub) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
- func (h *Hub) RemoveAllSubscriptions(sid string, identifier string)
- func (h *Hub) RemoveSession(s *Session)
- func (h *Hub) RemoveSubscription(sid string, identifier string, stream string)
- func (h *Hub) Run()
- func (h *Hub) Shutdown()
- func (h *Hub) Size() int
- func (h *Hub) StreamsSize() int
- func (h *Hub) UniqSize() int
- type HubRegistration
- type HubSubscription
- type Node
- func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error)
- func (n *Node) Broadcast(msg *common.StreamMessage)
- func (n *Node) Disconnect(s *Session) error
- func (n *Node) DisconnectNow(s *Session) error
- func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)
- func (n *Node) HandlePubSub(raw []byte)
- func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error)
- func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
- func (n *Node) SetDisconnector(d Disconnector)
- func (n *Node) Shutdown() (err error)
- func (n *Node) Start() error
- func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)
- func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)
- type NoopDisconnectQueue
- type Session
- func (s *Session) Disconnect(reason string, code int)
- func (s *Session) ReadMessage(message []byte) error
- func (s *Session) Send(msg encoders.EncodedMessage)
- func (s *Session) SendJSONTransmission(msg string)
- func (s *Session) SendMessages()
- func (s *Session) Serve(callback func()) error
- func (s *Session) ServeWithPoll(poller netpoll.Poller, callback func()) error
- func (s *Session) SetEncoder(enc encoders.Encoder)
- func (s *Session) SetExecutor(ex Executor)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppNode ¶ added in v1.0.1
type AppNode interface { HandlePubSub(msg []byte) Authenticate(s *Session) (*common.ConnectResult, error) Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error) Perform(s *Session, msg *common.Message) (*common.CommandResult, error) Disconnect(s *Session) error }
AppNode describes a basic node interface
type CachedEncodedMessage ¶ added in v1.1.0
type CachedEncodedMessage struct {
// contains filtered or unexported fields
}
func NewCachedEncodedMessage ¶ added in v1.1.0
func NewCachedEncodedMessage(msg encoders.EncodedMessage) *CachedEncodedMessage
func (*CachedEncodedMessage) Fetch ¶ added in v1.1.0
func (msg *CachedEncodedMessage) Fetch(id string, callback EncodingFunction) (*ws.SentFrame, error)
func (*CachedEncodedMessage) GetType ¶ added in v1.1.0
func (msg *CachedEncodedMessage) GetType() string
func (*CachedEncodedMessage) MarshalJSON ¶ added in v1.1.0
func (msg *CachedEncodedMessage) MarshalJSON() ([]byte, error)
type Config ¶ added in v1.0.5
type Config struct { // How often server should send Action Cable ping messages (seconds) PingInterval int // How ofter to refresh node stats (seconds) StatsRefreshInterval int // The max size of the Go routines pool for hub HubGopoolSize int // Whether to use net polling for reading data or spawn a go routine NetpollEnabled bool // The max size of the Go routines pool to process inbound client messages ReadGopoolSize int // The max size of the Go routines pool to process outbound client messages WriteGopoolSize int // How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds) PingTimestampPrecision string }
Config contains general application/node settings
type Connection ¶ added in v1.1.0
type Connection interface { Write(msg []byte, deadline time.Time) error WriteBinary(msg []byte, deadline time.Time) error Read() ([]byte, error) Close(code int, reason string) Descriptor() net.Conn }
Connection represents underlying connection
type Controller ¶
type Controller interface { Start() error Shutdown() error Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error) Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error) Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error) Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error }
Controller is an interface describing business-logic handler (e.g. RPC)
type DisconnectQueue ¶
type DisconnectQueue struct {
// contains filtered or unexported fields
}
DisconnectQueue is a rate-limited executor
func NewDisconnectQueue ¶
func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue
NewDisconnectQueue builds new queue with a specified rate (max calls per second)
func (*DisconnectQueue) Enqueue ¶
func (d *DisconnectQueue) Enqueue(s *Session) error
Enqueue adds session to the disconnect queue
func (*DisconnectQueue) Shutdown ¶
func (d *DisconnectQueue) Shutdown() error
Shutdown stops throttling and makes requests one by one
func (*DisconnectQueue) Size ¶
func (d *DisconnectQueue) Size() int
Size returns the number of enqueued tasks
type DisconnectQueueConfig ¶ added in v1.0.1
type DisconnectQueueConfig struct { // Limit the number of Disconnect RPC calls per second Rate int // How much time wait to call all enqueued calls at exit (in seconds) ShutdownTimeout int }
DisconnectQueueConfig contains DisconnectQueue configuration
func NewDisconnectQueueConfig ¶ added in v1.0.1
func NewDisconnectQueueConfig() DisconnectQueueConfig
NewDisconnectQueueConfig builds a new config
type Disconnector ¶ added in v1.0.1
Disconnector is an interface for disconnect queue implementation
type EncodingCache ¶ added in v1.1.0
type EncodingCache struct {
// contains filtered or unexported fields
}
func NewEncodingCache ¶ added in v1.1.0
func NewEncodingCache() *EncodingCache
func (*EncodingCache) Fetch ¶ added in v1.1.0
func (m *EncodingCache) Fetch( msg encoders.EncodedMessage, encoder string, callback EncodingFunction, ) (*ws.SentFrame, error)
type EncodingFunction ¶ added in v1.1.0
type EncodingFunction = func(encoders.EncodedMessage) (*ws.SentFrame, error)
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub stores all the sessions and the corresponding subscriptions info
func (*Hub) AddSession ¶ added in v1.0.4
AddSession enqueues sessions registration
func (*Hub) AddSubscription ¶ added in v1.0.4
AddSubscription enqueues adding a subscription for session-identifier pair to the hub
func (*Hub) BroadcastMessage ¶ added in v1.0.4
func (h *Hub) BroadcastMessage(msg *common.StreamMessage)
BroadcastMessage enqueues broadcasting a pre-built StreamMessage
func (*Hub) RemoteDisconnect ¶ added in v1.0.4
func (h *Hub) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
RemoteDisconnect enqueues remote disconnect command
func (*Hub) RemoveAllSubscriptions ¶ added in v1.0.4
RemoveAllSubscriptions enqueues removing all subscription for session-identifier pair from the hub
func (*Hub) RemoveSession ¶ added in v1.0.4
RemoveSession enqueues session un-registration
func (*Hub) RemoveSubscription ¶ added in v1.0.4
RemoveSubscription enqueues removing a subscription for session-identifier pair from the hub
func (*Hub) StreamsSize ¶
StreamsSize returns a number of uniq streams
type HubRegistration ¶ added in v1.0.4
type HubRegistration struct {
// contains filtered or unexported fields
}
HubRegistration represents registration event ("add" or "remove")
type HubSubscription ¶ added in v1.0.4
type HubSubscription struct {
// contains filtered or unexported fields
}
HubSubscription contains information about session-channel(-stream) subscription
type Node ¶
Node represents the whole application
func NewNode ¶
func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node
NewNode builds new node struct
func (*Node) Authenticate ¶
func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error)
Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.
func (*Node) Broadcast ¶
func (n *Node) Broadcast(msg *common.StreamMessage)
Broadcast message to stream
func (*Node) Disconnect ¶
Disconnect adds session to disconnector queue and unregister session from hub
func (*Node) DisconnectNow ¶
DisconnectNow execute disconnect on controller
func (*Node) HandleCommand ¶
HandleCommand parses incoming message from client and execute the command (if recognized)
func (*Node) HandlePubSub ¶ added in v1.0.1
HandlePubSub parses incoming pubsub message and broadcast it
func (*Node) RemoteDisconnect ¶ added in v1.0.1
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)
RemoteDisconnect find a session by identifier and closes it
func (*Node) SetDisconnector ¶ added in v1.0.1
func (n *Node) SetDisconnector(d Disconnector)
SetDisconnector set disconnector for the node
func (*Node) Unsubscribe ¶
Unsubscribe unsubscribes session from a channel
type NoopDisconnectQueue ¶ added in v1.0.1
type NoopDisconnectQueue struct{}
NoopDisconnectQueue is non-operational disconnect queue implementation
func NewNoopDisconnector ¶ added in v1.0.1
func NewNoopDisconnector() *NoopDisconnectQueue
NewNoopDisconnector returns new NoopDisconnectQueue
func (*NoopDisconnectQueue) Enqueue ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Enqueue(s *Session) error
Enqueue does nothing
func (*NoopDisconnectQueue) Run ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Run() error
Run does nothing
func (*NoopDisconnectQueue) Shutdown ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Shutdown() error
Shutdown does nothing
func (*NoopDisconnectQueue) Size ¶ added in v1.0.1
func (d *NoopDisconnectQueue) Size() int
Size returns 0
type Session ¶
type Session struct { UID string Identifiers string Connected bool Log *log.Entry // contains filtered or unexported fields }
Session represents active client
func NewSession ¶
func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string) *Session
NewSession build a new Session struct from ws connetion and http request
func (*Session) Disconnect ¶
Disconnect schedules connection disconnect
func (*Session) ReadMessage ¶ added in v1.1.0
ReadMessage reads messages from ws connection and send them to node
func (*Session) Send ¶
func (s *Session) Send(msg encoders.EncodedMessage)
Send schedules a data transmission
func (*Session) SendJSONTransmission ¶ added in v1.1.0
SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)
func (*Session) SendMessages ¶
func (s *Session) SendMessages()
SendMessages waits for incoming messages and send them to the client connection
func (*Session) ServeWithPoll ¶
ServeWithPoll register the connection within a netpoll and subscribes to Read/Close events