Documentation ¶
Index ¶
- Constants
- Variables
- func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](stream psrpc.Stream[SendType, RecvType], ch *MessageChannel, ...) error
- func ParticipantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey
- func ParticipantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey
- type LocalNode
- type LocalRouter
- func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error
- func (r *LocalRouter) Drain()
- func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)
- func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)
- func (r *LocalRouter) GetRegion() string
- func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)
- func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback)
- func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback)
- func (r *LocalRouter) RegisterNode() error
- func (r *LocalRouter) RemoveDeadNodes() error
- func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error
- func (r *LocalRouter) Start() error
- func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, ...)
- func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, ...) (connectionID livekit.ConnectionID, reqSink MessageSink, ...)
- func (r *LocalRouter) Stop()
- func (r *LocalRouter) UnregisterNode() error
- func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error
- func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, ...) error
- func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
- type MessageChannel
- type MessageRouter
- type MessageSink
- type MessageSource
- type NewParticipantCallback
- type ParticipantInit
- type RTCMessageCallback
- type RTCNodeSink
- type RedisRouter
- func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error
- func (r *RedisRouter) Drain()
- func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)
- func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)
- func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)
- func (r *RedisRouter) RegisterNode() error
- func (r *RedisRouter) RemoveDeadNodes() error
- func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error
- func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, ...) error
- func (r *RedisRouter) Start() error
- func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, ...)
- func (r *RedisRouter) Stop()
- func (r *RedisRouter) UnregisterNode() error
- func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error
- func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, ...) error
- func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
- type RelaySignalMessage
- type Router
- type SignalClient
- type SignalMessageReader
- type SignalMessageWriter
- type SignalNodeSink
- type SignalSinkParams
Constants ¶
View Source
const ( // hash of node_id => Node proto NodesKey = "nodes" // hash of room_name => node_id NodeRoomKey = "room_node_map" )
View Source
const DefaultMessageChannelSize = 200
Variables ¶
View Source
var ( ErrNotFound = errors.New("could not find object") ErrIPNotSet = errors.New("ip address is required and not set") ErrHandlerNotDefined = errors.New("handler not defined") ErrIncorrectRTCNode = errors.New("current node isn't the RTC node for the room") ErrNodeNotFound = errors.New("could not locate the node") ErrNodeLimitReached = errors.New("reached configured limit for node") ErrInvalidRouterMessage = errors.New("invalid router message") ErrChannelClosed = errors.New("channel closed") ErrChannelFull = errors.New("channel is full") )
View Source
var ErrSignalMessageDropped = errors.New("signal message dropped")
View Source
var ErrSignalWriteFailed = errors.New("signal write failed")
Functions ¶
func CopySignalStreamToMessageChannel ¶
func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage]( stream psrpc.Stream[SendType, RecvType], ch *MessageChannel, reader SignalMessageReader[RecvType], config config.SignalRelayConfig, ) error
func ParticipantKey ¶
func ParticipantKey(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey
func ParticipantKeyLegacy ¶
func ParticipantKeyLegacy(roomName livekit.RoomName, identity livekit.ParticipantIdentity) livekit.ParticipantKey
Types ¶
type LocalRouter ¶
type LocalRouter struct {
// contains filtered or unexported fields
}
a router of messages on the same node, basic implementation for local testing
func NewLocalRouter ¶
func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRouter
func (*LocalRouter) ClearRoomState ¶
func (*LocalRouter) Drain ¶
func (r *LocalRouter) Drain()
func (*LocalRouter) GetNodeForRoom ¶
func (*LocalRouter) GetRegion ¶
func (r *LocalRouter) GetRegion() string
func (*LocalRouter) OnNewParticipantRTC ¶
func (r *LocalRouter) OnNewParticipantRTC(callback NewParticipantCallback)
func (*LocalRouter) OnRTCMessage ¶
func (r *LocalRouter) OnRTCMessage(callback RTCMessageCallback)
func (*LocalRouter) RegisterNode ¶
func (r *LocalRouter) RegisterNode() error
func (*LocalRouter) RemoveDeadNodes ¶
func (r *LocalRouter) RemoveDeadNodes() error
func (*LocalRouter) SetNodeForRoom ¶
func (*LocalRouter) Start ¶
func (r *LocalRouter) Start() error
func (*LocalRouter) StartParticipantSignal ¶
func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
func (*LocalRouter) StartParticipantSignalWithNodeID ¶
func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
func (*LocalRouter) Stop ¶
func (r *LocalRouter) Stop()
func (*LocalRouter) UnregisterNode ¶
func (r *LocalRouter) UnregisterNode() error
func (*LocalRouter) WriteNodeRTC ¶
func (r *LocalRouter) WriteNodeRTC(_ context.Context, _ string, msg *livekit.RTCNodeMessage) error
func (*LocalRouter) WriteParticipantRTC ¶
func (r *LocalRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
func (*LocalRouter) WriteRoomRTC ¶
func (r *LocalRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
type MessageChannel ¶
type MessageChannel struct {
// contains filtered or unexported fields
}
func NewDefaultMessageChannel ¶
func NewDefaultMessageChannel() *MessageChannel
func NewMessageChannel ¶
func NewMessageChannel(size int) *MessageChannel
func (*MessageChannel) Close ¶
func (m *MessageChannel) Close()
func (*MessageChannel) IsClosed ¶
func (m *MessageChannel) IsClosed() bool
func (*MessageChannel) OnClose ¶
func (m *MessageChannel) OnClose(f func())
func (*MessageChannel) ReadChan ¶
func (m *MessageChannel) ReadChan() <-chan proto.Message
func (*MessageChannel) WriteMessage ¶
func (m *MessageChannel) WriteMessage(msg proto.Message) error
type MessageRouter ¶
type MessageRouter interface { // StartParticipantSignal participant signal connection is ready to start StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) // Write a message to a participant or room WriteParticipantRTC(ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error }
type MessageSink ¶
MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport
func NewSignalMessageSink ¶
func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink
type MessageSource ¶
type NewParticipantCallback ¶
type NewParticipantCallback func( ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, requestSource MessageSource, responseSink MessageSink, ) error
type ParticipantInit ¶
type ParticipantInit struct { Identity livekit.ParticipantIdentity Name livekit.ParticipantName Reconnect bool ReconnectReason livekit.ReconnectReason AutoSubscribe bool Client *livekit.ClientInfo Grants *auth.ClaimGrants Region string AdaptiveStream bool ID livekit.ParticipantID SubscriberAllowPause *bool }
func ParticipantInitFromStartSession ¶
func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)
func (*ParticipantInit) ToStartSession ¶
func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)
type RTCMessageCallback ¶
type RTCMessageCallback func( ctx context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage, )
type RTCNodeSink ¶
type RTCNodeSink struct {
// contains filtered or unexported fields
}
func NewRTCNodeSink ¶
func NewRTCNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey) *RTCNodeSink
func (*RTCNodeSink) Close ¶
func (s *RTCNodeSink) Close()
func (*RTCNodeSink) IsClosed ¶
func (s *RTCNodeSink) IsClosed() bool
func (*RTCNodeSink) OnClose ¶
func (s *RTCNodeSink) OnClose(f func())
func (*RTCNodeSink) WriteMessage ¶
func (s *RTCNodeSink) WriteMessage(msg proto.Message) error
type RedisRouter ¶
type RedisRouter struct { *LocalRouter // contains filtered or unexported fields }
RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because
func NewRedisRouter ¶
func NewRedisRouter(config *config.Config, lr *LocalRouter, rc redis.UniversalClient) *RedisRouter
func (*RedisRouter) ClearRoomState ¶
func (*RedisRouter) Drain ¶
func (r *RedisRouter) Drain()
func (*RedisRouter) GetNodeForRoom ¶
func (*RedisRouter) RegisterNode ¶
func (r *RedisRouter) RegisterNode() error
func (*RedisRouter) RemoveDeadNodes ¶
func (r *RedisRouter) RemoveDeadNodes() error
func (*RedisRouter) SetNodeForRoom ¶
func (*RedisRouter) SetParticipantRTCNode ¶
func (r *RedisRouter) SetParticipantRTCNode(participantKey livekit.ParticipantKey, participantKeyB62 livekit.ParticipantKey, nodeID string) error
func (*RedisRouter) Start ¶
func (r *RedisRouter) Start() error
func (*RedisRouter) StartParticipantSignal ¶
func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue
func (*RedisRouter) Stop ¶
func (r *RedisRouter) Stop()
func (*RedisRouter) UnregisterNode ¶
func (r *RedisRouter) UnregisterNode() error
func (*RedisRouter) WriteNodeRTC ¶
func (r *RedisRouter) WriteNodeRTC(_ context.Context, rtcNodeID string, msg *livekit.RTCNodeMessage) error
func (*RedisRouter) WriteParticipantRTC ¶
func (r *RedisRouter) WriteParticipantRTC(_ context.Context, roomName livekit.RoomName, identity livekit.ParticipantIdentity, msg *livekit.RTCNodeMessage) error
func (*RedisRouter) WriteRoomRTC ¶
func (r *RedisRouter) WriteRoomRTC(ctx context.Context, roomName livekit.RoomName, msg *livekit.RTCNodeMessage) error
type RelaySignalMessage ¶
type Router ¶
type Router interface { MessageRouter RegisterNode() error UnregisterNode() error RemoveDeadNodes() error ListNodes() ([]*livekit.Node, error) GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error) SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error ClearRoomState(ctx context.Context, roomName livekit.RoomName) error GetRegion() string Start() error Drain() Stop() // OnNewParticipantRTC is called to start a new participant's RTC connection OnNewParticipantRTC(callback NewParticipantCallback) // OnRTCMessage is called to execute actions on the RTC node OnRTCMessage(callback RTCMessageCallback) }
Router allows multiple nodes to coordinate the participant session
func CreateRouter ¶
func CreateRouter(config *config.Config, rc redis.UniversalClient, node LocalNode, signalClient SignalClient) Router
type SignalClient ¶
type SignalClient interface { ActiveCount() int StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error) }
func NewSignalClient ¶
func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)
type SignalMessageReader ¶
type SignalMessageReader[RecvType RelaySignalMessage] interface { Read(msg RecvType) ([]proto.Message, error) }
type SignalMessageWriter ¶
type SignalMessageWriter[SendType RelaySignalMessage] interface { Write(seq uint64, close bool, msgs []proto.Message) SendType }
type SignalNodeSink ¶
type SignalNodeSink struct {
// contains filtered or unexported fields
}
func NewSignalNodeSink ¶
func NewSignalNodeSink(rc redis.UniversalClient, nodeID livekit.NodeID, connectionID livekit.ConnectionID) *SignalNodeSink
func (*SignalNodeSink) Close ¶
func (s *SignalNodeSink) Close()
func (*SignalNodeSink) IsClosed ¶
func (s *SignalNodeSink) IsClosed() bool
func (*SignalNodeSink) OnClose ¶
func (s *SignalNodeSink) OnClose(f func())
func (*SignalNodeSink) WriteMessage ¶
func (s *SignalNodeSink) WriteMessage(msg proto.Message) error
type SignalSinkParams ¶
type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct { Stream psrpc.Stream[SendType, RecvType] Logger logger.Logger Config config.SignalRelayConfig Writer SignalMessageWriter[SendType] CloseOnFailure bool BlockOnClose bool }
Source Files ¶
Click to show internal directories.
Click to hide internal directories.