node

package
v1.1.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 1, 2021 License: MIT Imports: 16 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppNode added in v1.0.1

type AppNode interface {
	HandlePubSub(msg []byte)
	Authenticate(s *Session) (*common.ConnectResult, error)
	Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
	Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
	Disconnect(s *Session) error
}

AppNode describes a basic node interface

type CachedEncodedMessage added in v1.1.0

type CachedEncodedMessage struct {
	// contains filtered or unexported fields
}

func NewCachedEncodedMessage added in v1.1.0

func NewCachedEncodedMessage(msg encoders.EncodedMessage) *CachedEncodedMessage

func (*CachedEncodedMessage) Fetch added in v1.1.0

func (msg *CachedEncodedMessage) Fetch(id string, callback EncodingFunction) (*ws.SentFrame, error)

func (*CachedEncodedMessage) GetType added in v1.1.0

func (msg *CachedEncodedMessage) GetType() string

func (*CachedEncodedMessage) MarshalJSON added in v1.1.0

func (msg *CachedEncodedMessage) MarshalJSON() ([]byte, error)

type Config added in v1.0.5

type Config struct {
	// How often server should send Action Cable ping messages (seconds)
	PingInterval int
	// How ofter to refresh node stats (seconds)
	StatsRefreshInterval int
	// The max size of the Go routines pool for hub
	HubGopoolSize int
	// Whether to use net polling for reading data or spawn a go routine
	NetpollEnabled bool
	// The max size of the Go routines pool to process inbound client messages
	ReadGopoolSize int
	// The max size of the Go routines pool to process outbound client messages
	WriteGopoolSize int
	// How should ping message timestamp be formatted? ('s' => seconds, 'ms' => milli seconds, 'ns' => nano seconds)
	PingTimestampPrecision string
}

Config contains general application/node settings

func NewConfig added in v1.0.5

func NewConfig() Config

NewConfig builds a new config

type Connection added in v1.1.0

type Connection interface {
	Write(msg []byte, deadline time.Time) error
	WriteBinary(msg []byte, deadline time.Time) error
	Read() ([]byte, error)
	Close(code int, reason string)
	Descriptor() net.Conn
}

Connection represents underlying connection

type Controller

type Controller interface {
	Start() error
	Shutdown() error
	Authenticate(sid string, env *common.SessionEnv) (*common.ConnectResult, error)
	Subscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Unsubscribe(sid string, env *common.SessionEnv, id string, channel string) (*common.CommandResult, error)
	Perform(sid string, env *common.SessionEnv, id string, channel string, data string) (*common.CommandResult, error)
	Disconnect(sid string, env *common.SessionEnv, id string, subscriptions []string) error
}

Controller is an interface describing business-logic handler (e.g. RPC)

type DisconnectQueue

type DisconnectQueue struct {
	// contains filtered or unexported fields
}

DisconnectQueue is a rate-limited executor

func NewDisconnectQueue

func NewDisconnectQueue(node *Node, config *DisconnectQueueConfig) *DisconnectQueue

NewDisconnectQueue builds new queue with a specified rate (max calls per second)

func (*DisconnectQueue) Enqueue

func (d *DisconnectQueue) Enqueue(s *Session) error

Enqueue adds session to the disconnect queue

func (*DisconnectQueue) Run

func (d *DisconnectQueue) Run() error

Run starts queue

func (*DisconnectQueue) Shutdown

func (d *DisconnectQueue) Shutdown() error

Shutdown stops throttling and makes requests one by one

func (*DisconnectQueue) Size

func (d *DisconnectQueue) Size() int

Size returns the number of enqueued tasks

type DisconnectQueueConfig added in v1.0.1

type DisconnectQueueConfig struct {
	// Limit the number of Disconnect RPC calls per second
	Rate int
	// How much time wait to call all enqueued calls at exit (in seconds)
	ShutdownTimeout int
}

DisconnectQueueConfig contains DisconnectQueue configuration

func NewDisconnectQueueConfig added in v1.0.1

func NewDisconnectQueueConfig() DisconnectQueueConfig

NewDisconnectQueueConfig builds a new config

type Disconnector added in v1.0.1

type Disconnector interface {
	Run() error
	Shutdown() error
	Enqueue(*Session) error
	Size() int
}

Disconnector is an interface for disconnect queue implementation

type EncodingCache added in v1.1.0

type EncodingCache struct {
	// contains filtered or unexported fields
}

func NewEncodingCache added in v1.1.0

func NewEncodingCache() *EncodingCache

func (*EncodingCache) Fetch added in v1.1.0

func (m *EncodingCache) Fetch(
	msg encoders.EncodedMessage,
	encoder string,
	callback EncodingFunction,
) (*ws.SentFrame, error)

type EncodingFunction added in v1.1.0

type EncodingFunction = func(encoders.EncodedMessage) (*ws.SentFrame, error)

type Executor added in v1.1.0

type Executor interface {
	HandleCommand(*Session, *common.Message) error
}

Executor handles incoming commands (messages)

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores all the sessions and the corresponding subscriptions info

func NewHub

func NewHub(poolSize int) *Hub

NewHub builds new hub instance

func (*Hub) AddSession added in v1.0.4

func (h *Hub) AddSession(s *Session)

AddSession enqueues sessions registration

func (*Hub) AddSubscription added in v1.0.4

func (h *Hub) AddSubscription(sid string, identifier string, stream string)

AddSubscription enqueues adding a subscription for session-identifier pair to the hub

func (*Hub) Broadcast added in v1.0.4

func (h *Hub) Broadcast(stream string, data string)

Broadcast enqueues data broadcasting to a stream

func (*Hub) BroadcastMessage added in v1.0.4

func (h *Hub) BroadcastMessage(msg *common.StreamMessage)

BroadcastMessage enqueues broadcasting a pre-built StreamMessage

func (*Hub) RemoteDisconnect added in v1.0.4

func (h *Hub) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect enqueues remote disconnect command

func (*Hub) RemoveAllSubscriptions added in v1.0.4

func (h *Hub) RemoveAllSubscriptions(sid string, identifier string)

RemoveAllSubscriptions enqueues removing all subscription for session-identifier pair from the hub

func (*Hub) RemoveSession added in v1.0.4

func (h *Hub) RemoveSession(s *Session)

RemoveSession enqueues session un-registration

func (*Hub) RemoveSubscription added in v1.0.4

func (h *Hub) RemoveSubscription(sid string, identifier string, stream string)

RemoveSubscription enqueues removing a subscription for session-identifier pair from the hub

func (*Hub) Run

func (h *Hub) Run()

Run makes hub active

func (*Hub) Shutdown

func (h *Hub) Shutdown()

Shutdown sends shutdown command to hub

func (*Hub) Size

func (h *Hub) Size() int

Size returns a number of active sessions

func (*Hub) StreamsSize

func (h *Hub) StreamsSize() int

StreamsSize returns a number of uniq streams

func (*Hub) UniqSize

func (h *Hub) UniqSize() int

UniqSize returns a number of uniq identifiers

type HubRegistration added in v1.0.4

type HubRegistration struct {
	// contains filtered or unexported fields
}

HubRegistration represents registration event ("add" or "remove")

type HubSubscription added in v1.0.4

type HubSubscription struct {
	// contains filtered or unexported fields
}

HubSubscription contains information about session-channel(-stream) subscription

type Node

type Node struct {
	Metrics *metrics.Metrics
	// contains filtered or unexported fields
}

Node represents the whole application

func NewNode

func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node

NewNode builds new node struct

func (*Node) Authenticate

func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error)

Authenticate calls controller to perform authentication. If authentication is successful, session is registered with a hub.

func (*Node) Broadcast

func (n *Node) Broadcast(msg *common.StreamMessage)

Broadcast message to stream

func (*Node) Disconnect

func (n *Node) Disconnect(s *Session) error

Disconnect adds session to disconnector queue and unregister session from hub

func (*Node) DisconnectNow

func (n *Node) DisconnectNow(s *Session) error

DisconnectNow execute disconnect on controller

func (*Node) HandleCommand

func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error)

HandleCommand parses incoming message from client and execute the command (if recognized)

func (*Node) HandlePubSub added in v1.0.1

func (n *Node) HandlePubSub(raw []byte)

HandlePubSub parses incoming pubsub message and broadcast it

func (*Node) Perform

func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error)

Perform executes client command

func (*Node) RemoteDisconnect added in v1.0.1

func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage)

RemoteDisconnect find a session by identifier and closes it

func (*Node) SetDisconnector added in v1.0.1

func (n *Node) SetDisconnector(d Disconnector)

SetDisconnector set disconnector for the node

func (*Node) Shutdown

func (n *Node) Shutdown() (err error)

Shutdown stops all services (hub, controller)

func (*Node) Start added in v1.0.1

func (n *Node) Start() error

Start runs all the required goroutines

func (*Node) Subscribe

func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)

Subscribe subscribes session to a channel

func (*Node) Unsubscribe

func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error)

Unsubscribe unsubscribes session from a channel

type NoopDisconnectQueue added in v1.0.1

type NoopDisconnectQueue struct{}

NoopDisconnectQueue is non-operational disconnect queue implementation

func NewNoopDisconnector added in v1.0.1

func NewNoopDisconnector() *NoopDisconnectQueue

NewNoopDisconnector returns new NoopDisconnectQueue

func (*NoopDisconnectQueue) Enqueue added in v1.0.1

func (d *NoopDisconnectQueue) Enqueue(s *Session) error

Enqueue does nothing

func (*NoopDisconnectQueue) Run added in v1.0.1

func (d *NoopDisconnectQueue) Run() error

Run does nothing

func (*NoopDisconnectQueue) Shutdown added in v1.0.1

func (d *NoopDisconnectQueue) Shutdown() error

Shutdown does nothing

func (*NoopDisconnectQueue) Size added in v1.0.1

func (d *NoopDisconnectQueue) Size() int

Size returns 0

type Session

type Session struct {
	UID         string
	Identifiers string
	Connected   bool
	Log         *log.Entry
	// contains filtered or unexported fields
}

Session represents active client

func NewSession

func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string) *Session

NewSession build a new Session struct from ws connetion and http request

func (*Session) Disconnect

func (s *Session) Disconnect(reason string, code int)

Disconnect schedules connection disconnect

func (*Session) ReadMessage added in v1.1.0

func (s *Session) ReadMessage(message []byte) error

ReadMessage reads messages from ws connection and send them to node

func (*Session) Send

func (s *Session) Send(msg encoders.EncodedMessage)

Send schedules a data transmission

func (*Session) SendJSONTransmission added in v1.1.0

func (s *Session) SendJSONTransmission(msg string)

SendJSONTransmission is used to propagate the direct transmission to the client (from RPC call result)

func (*Session) SendMessages

func (s *Session) SendMessages()

SendMessages waits for incoming messages and send them to the client connection

func (*Session) Serve added in v1.1.0

func (s *Session) Serve(callback func()) error

Serve enters a loop to read incoming data

func (*Session) ServeWithPoll

func (s *Session) ServeWithPoll(poller netpoll.Poller, callback func()) error

ServeWithPoll register the connection within a netpoll and subscribes to Read/Close events

func (*Session) SetEncoder added in v1.1.0

func (s *Session) SetEncoder(enc encoders.Encoder)

func (*Session) SetExecutor added in v1.1.0

func (s *Session) SetExecutor(ex Executor)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL