Documentation ¶
Index ¶
- Constants
- Variables
- func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](stream psrpc.Stream[SendType, RecvType], ch *MessageChannel, ...) error
- type CustomCleanup
- type LocalNode
- type LocalRouter
- func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error
- func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
- func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error)
- func (r *LocalRouter) Drain()
- func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)
- func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)
- func (r *LocalRouter) GetRegion() string
- func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)
- func (r *LocalRouter) RegisterNode() error
- func (r *LocalRouter) RemoveDeadNodes(_ CustomCleanup) error
- func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error
- func (r *LocalRouter) Start() error
- func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
- func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, ...) (res StartParticipantSignalResults, err error)
- func (r *LocalRouter) Stop()
- func (r *LocalRouter) UnregisterNode() error
- type MessageChannel
- type MessageRouter
- type MessageSink
- type MessageSource
- type ParticipantInit
- type RedisRouter
- func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error
- func (r *RedisRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
- func (r *RedisRouter) Drain()
- func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)
- func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)
- func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)
- func (r *RedisRouter) RegisterNode() error
- func (r *RedisRouter) RemoveDeadNodes(customCleanup CustomCleanup) error
- func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error
- func (r *RedisRouter) Start() error
- func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
- func (r *RedisRouter) Stop()
- func (r *RedisRouter) UnregisterNode() error
- type RelaySignalMessage
- type RoomManagerClient
- type Router
- type SignalClient
- type SignalMessageReader
- type SignalMessageWriter
- type SignalSinkParams
- type StartParticipantSignalResults
Constants ¶
const ( // hash of node_id => Node proto NodesKey = "nodes" // hash of room_name => node_id NodeRoomKey = "room_node_map" )
const DefaultMessageChannelSize = 200
Variables ¶
var ( ErrNotFound = errors.New("could not find object") ErrIPNotSet = errors.New("ip address is required and not set") ErrHandlerNotDefined = errors.New("handler not defined") ErrIncorrectRTCNode = errors.New("current node isn't the RTC node for the room") ErrNodeNotFound = errors.New("could not locate the node") ErrNodeLimitReached = errors.New("reached configured limit for node") ErrInvalidRouterMessage = errors.New("invalid router message") ErrChannelClosed = errors.New("channel closed") ErrChannelFull = errors.New("channel is full") // errors when starting signal connection ErrRequestChannelClosed = errors.New("request channel closed") ErrCouldNotMigrateParticipant = errors.New("could not migrate participant") ErrClientInfoNotSet = errors.New("client info not set") )
var ErrSignalMessageDropped = errors.New("signal message dropped")
var ErrSignalWriteFailed = errors.New("signal write failed")
Functions ¶
func CopySignalStreamToMessageChannel ¶
func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( stream psrpc.Stream[SendType, RecvType], ch *MessageChannel, reader SignalMessageReader[RecvType], config config.SignalRelayConfig, ) error
Types ¶
type CustomCleanup ¶
type CustomCleanup interface { LockRoom(ctx context.Context, roomName livekit.RoomName, timeout time.Duration) (string, error) UnlockRoom(ctx context.Context, roomName livekit.RoomName, token string) error PublicDeleteRoom(ctx context.Context, roomName livekit.RoomName) error }
BEGIN OPENVIDU BLOCK
type LocalRouter ¶
type LocalRouter struct {
// contains filtered or unexported fields
}
a router of messages on the same node, basic implementation for local testing
func NewLocalRouter ¶
func NewLocalRouter( currentNode LocalNode, signalClient SignalClient, roomManagerClient RoomManagerClient, ) *LocalRouter
func (*LocalRouter) ClearRoomState ¶
func (*LocalRouter) CreateRoom ¶
func (r *LocalRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
func (*LocalRouter) CreateRoomWithNodeID ¶
func (r *LocalRouter) CreateRoomWithNodeID(ctx context.Context, req *livekit.CreateRoomRequest, nodeID livekit.NodeID) (res *livekit.Room, err error)
func (*LocalRouter) Drain ¶
func (r *LocalRouter) Drain()
func (*LocalRouter) GetNodeForRoom ¶
func (*LocalRouter) GetRegion ¶
func (r *LocalRouter) GetRegion() string
func (*LocalRouter) RegisterNode ¶
func (r *LocalRouter) RegisterNode() error
func (*LocalRouter) RemoveDeadNodes ¶
func (r *LocalRouter) RemoveDeadNodes(_ CustomCleanup) error
BEGIN OPENVIDU BLOCK
func (*LocalRouter) SetNodeForRoom ¶
func (*LocalRouter) Start ¶
func (r *LocalRouter) Start() error
func (*LocalRouter) StartParticipantSignal ¶
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
func (*LocalRouter) StartParticipantSignalWithNodeID ¶
func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error)
func (*LocalRouter) Stop ¶
func (r *LocalRouter) Stop()
func (*LocalRouter) UnregisterNode ¶
func (r *LocalRouter) UnregisterNode() error
type MessageChannel ¶
type MessageChannel struct {
// contains filtered or unexported fields
}
func NewDefaultMessageChannel ¶
func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel
func NewMessageChannel ¶
func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel
func (*MessageChannel) Close ¶
func (m *MessageChannel) Close()
func (*MessageChannel) ConnectionID ¶
func (m *MessageChannel) ConnectionID() livekit.ConnectionID
func (*MessageChannel) IsClosed ¶
func (m *MessageChannel) IsClosed() bool
func (*MessageChannel) OnClose ¶
func (m *MessageChannel) OnClose(f func())
func (*MessageChannel) ReadChan ¶
func (m *MessageChannel) ReadChan() <-chan proto.Message
func (*MessageChannel) WriteMessage ¶
func (m *MessageChannel) WriteMessage(msg proto.Message) error
type MessageRouter ¶
type MessageRouter interface { // CreateRoom starts an rtc room CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error) // StartParticipantSignal participant signal connection is ready to start StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error) }
type MessageSink ¶
type MessageSink interface { WriteMessage(msg proto.Message) error IsClosed() bool Close() ConnectionID() livekit.ConnectionID }
MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport
func NewSignalMessageSink ¶
func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink
type MessageSource ¶
type MessageSource interface { // ReadChan exposes a one way channel to make it easier to use with select ReadChan() <-chan proto.Message IsClosed() bool Close() ConnectionID() livekit.ConnectionID }
type ParticipantInit ¶
type ParticipantInit struct { Identity livekit.ParticipantIdentity Name livekit.ParticipantName Reconnect bool ReconnectReason livekit.ReconnectReason AutoSubscribe bool Client *livekit.ClientInfo Grants *auth.ClaimGrants Region string AdaptiveStream bool ID livekit.ParticipantID SubscriberAllowPause *bool DisableICELite bool CreateRoom *livekit.CreateRoomRequest }
func ParticipantInitFromStartSession ¶
func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)
func (*ParticipantInit) ToStartSession ¶
func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)
type RedisRouter ¶
type RedisRouter struct { *LocalRouter // contains filtered or unexported fields }
RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because
func NewRedisRouter ¶
func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient, kps rpc.KeepalivePubSub) *RedisRouter
func (*RedisRouter) ClearRoomState ¶
func (*RedisRouter) CreateRoom ¶
func (r *RedisRouter) CreateRoom(ctx context.Context, req *livekit.CreateRoomRequest) (res *livekit.Room, err error)
func (*RedisRouter) Drain ¶
func (r *RedisRouter) Drain()
func (*RedisRouter) GetNodeForRoom ¶
func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)
GetNodeForRoom finds the node where the room is hosted at
func (*RedisRouter) RegisterNode ¶
func (r *RedisRouter) RegisterNode() error
func (*RedisRouter) RemoveDeadNodes ¶
func (r *RedisRouter) RemoveDeadNodes(customCleanup CustomCleanup) error
BEGIN OPENVIDU BLOCK
func (*RedisRouter) SetNodeForRoom ¶
func (*RedisRouter) Start ¶
func (r *RedisRouter) Start() error
func (*RedisRouter) StartParticipantSignal ¶
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (*RedisRouter) Stop ¶
func (r *RedisRouter) Stop()
func (*RedisRouter) UnregisterNode ¶
func (r *RedisRouter) UnregisterNode() error
type RelaySignalMessage ¶
type RoomManagerClient ¶
type RoomManagerClient interface { rpc.TypedRoomManagerClient }
func NewRoomManagerClient ¶
func NewRoomManagerClient(clientParams rpc.ClientParams, config config.RoomConfig) (RoomManagerClient, error)
type Router ¶
type Router interface { MessageRouter RegisterNode() error UnregisterNode() error // BEGIN OPENVIDU BLOCK RemoveDeadNodes(customCleanup CustomCleanup) error ListNodes() ([]*livekit.Node, error) GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error) SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error ClearRoomState(ctx context.Context, roomName livekit.RoomName) error GetRegion() string Start() error Drain() Stop() }
Router allows multiple nodes to coordinate the participant session
func CreateRouter ¶
func CreateRouter( rc redis.UniversalClient, node LocalNode, signalClient SignalClient, roomManagerClient RoomManagerClient, kps rpc.KeepalivePubSub, ) Router
type SignalClient ¶
type SignalClient interface { ActiveCount() int StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) }
func NewSignalClient ¶
func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)
type SignalMessageReader ¶
type SignalMessageReader[RecvType RelaySignalMessage] interface { Read(msg RecvType) ([]proto.Message, error) }
type SignalMessageWriter ¶
type SignalMessageWriter[SendType RelaySignalMessage] interface { Write(seq uint64, close bool, msgs []proto.Message) SendType }
type SignalSinkParams ¶
type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct { Stream psrpc.Stream[SendType, RecvType] Logger logger.Logger Config config.SignalRelayConfig Writer SignalMessageWriter[SendType] CloseOnFailure bool BlockOnClose bool ConnectionID livekit.ConnectionID }
type StartParticipantSignalResults ¶
type StartParticipantSignalResults struct { ConnectionID livekit.ConnectionID RequestSink MessageSink ResponseSource MessageSource NodeID livekit.NodeID NodeSelectionReason string }