routing

package
v1.5.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2024 License: Apache-2.0 Imports: 22 Imported by: 10

Documentation

Index

Constants

View Source
const (

	// hash of node_id => Node proto
	NodesKey = "nodes"

	// hash of room_name => node_id
	NodeRoomKey = "room_node_map"
)
View Source
const DefaultMessageChannelSize = 200

Variables

View Source
var (
	ErrNotFound             = errors.New("could not find object")
	ErrIPNotSet             = errors.New("ip address is required and not set")
	ErrHandlerNotDefined    = errors.New("handler not defined")
	ErrIncorrectRTCNode     = errors.New("current node isn't the RTC node for the room")
	ErrNodeNotFound         = errors.New("could not locate the node")
	ErrNodeLimitReached     = errors.New("reached configured limit for node")
	ErrInvalidRouterMessage = errors.New("invalid router message")
	ErrChannelClosed        = errors.New("channel closed")
	ErrChannelFull          = errors.New("channel is full")

	// errors when starting signal connection
	ErrRequestChannelClosed       = errors.New("request channel closed")
	ErrCouldNotMigrateParticipant = errors.New("could not migrate participant")
	ErrClientInfoNotSet           = errors.New("client info not set")
)
View Source
var ErrSignalMessageDropped = errors.New("signal message dropped")
View Source
var ErrSignalWriteFailed = errors.New("signal write failed")

Functions

func CopySignalStreamToMessageChannel added in v1.4.2

func CopySignalStreamToMessageChannel[SendType, RecvType RelaySignalMessage](
	stream psrpc.Stream[SendType, RecvType],
	ch *MessageChannel,
	reader SignalMessageReader[RecvType],
	config config.SignalRelayConfig,
) error

Types

type LocalNode

type LocalNode *livekit.Node

func NewLocalNode

func NewLocalNode(conf *config.Config) (LocalNode, error)

type LocalRouter

type LocalRouter struct {
	// contains filtered or unexported fields
}

a router of messages on the same node, basic implementation for local testing

func NewLocalRouter

func NewLocalRouter(currentNode LocalNode, signalClient SignalClient) *LocalRouter

func (*LocalRouter) ClearRoomState

func (r *LocalRouter) ClearRoomState(_ context.Context, _ livekit.RoomName) error

func (*LocalRouter) Drain added in v0.13.5

func (r *LocalRouter) Drain()

func (*LocalRouter) GetNode

func (r *LocalRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*LocalRouter) GetNodeForRoom

func (r *LocalRouter) GetNodeForRoom(_ context.Context, _ livekit.RoomName) (*livekit.Node, error)

func (*LocalRouter) GetRegion added in v0.15.7

func (r *LocalRouter) GetRegion() string

func (*LocalRouter) ListNodes

func (r *LocalRouter) ListNodes() ([]*livekit.Node, error)

func (*LocalRouter) RegisterNode

func (r *LocalRouter) RegisterNode() error

func (*LocalRouter) RemoveDeadNodes

func (r *LocalRouter) RemoveDeadNodes() error

func (*LocalRouter) SetNodeForRoom

func (r *LocalRouter) SetNodeForRoom(_ context.Context, _ livekit.RoomName, _ livekit.NodeID) error

func (*LocalRouter) Start

func (r *LocalRouter) Start() error

func (*LocalRouter) StartParticipantSignal

func (r *LocalRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

func (*LocalRouter) StartParticipantSignalWithNodeID added in v1.4.0

func (r *LocalRouter) StartParticipantSignalWithNodeID(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (res StartParticipantSignalResults, err error)

func (*LocalRouter) Stop

func (r *LocalRouter) Stop()

func (*LocalRouter) UnregisterNode

func (r *LocalRouter) UnregisterNode() error

type MessageChannel

type MessageChannel struct {
	// contains filtered or unexported fields
}

func NewDefaultMessageChannel added in v1.4.0

func NewDefaultMessageChannel(connectionID livekit.ConnectionID) *MessageChannel

func NewMessageChannel

func NewMessageChannel(connectionID livekit.ConnectionID, size int) *MessageChannel

func (*MessageChannel) Close

func (m *MessageChannel) Close()

func (*MessageChannel) ConnectionID added in v1.4.4

func (m *MessageChannel) ConnectionID() livekit.ConnectionID

func (*MessageChannel) IsClosed added in v0.13.1

func (m *MessageChannel) IsClosed() bool

func (*MessageChannel) OnClose

func (m *MessageChannel) OnClose(f func())

func (*MessageChannel) ReadChan

func (m *MessageChannel) ReadChan() <-chan proto.Message

func (*MessageChannel) WriteMessage

func (m *MessageChannel) WriteMessage(msg proto.Message) error

type MessageRouter added in v0.14.2

type MessageRouter interface {
	// StartParticipantSignal participant signal connection is ready to start
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)
}

type MessageSink

type MessageSink interface {
	WriteMessage(msg proto.Message) error
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

MessageSink is an abstraction for writing protobuf messages and having them read by a MessageSource, potentially on a different node via a transport

func NewSignalMessageSink added in v1.4.2

func NewSignalMessageSink[SendType, RecvType RelaySignalMessage](params SignalSinkParams[SendType, RecvType]) MessageSink

type MessageSource

type MessageSource interface {
	// ReadChan exposes a one way channel to make it easier to use with select
	ReadChan() <-chan proto.Message
	IsClosed() bool
	Close()
	ConnectionID() livekit.ConnectionID
}

type ParticipantInit

type ParticipantInit struct {
	Identity             livekit.ParticipantIdentity
	Name                 livekit.ParticipantName
	Reconnect            bool
	ReconnectReason      livekit.ReconnectReason
	AutoSubscribe        bool
	Client               *livekit.ClientInfo
	Grants               *auth.ClaimGrants
	Region               string
	AdaptiveStream       bool
	ID                   livekit.ParticipantID
	SubscriberAllowPause *bool
}

func ParticipantInitFromStartSession added in v0.15.7

func ParticipantInitFromStartSession(ss *livekit.StartSession, region string) (*ParticipantInit, error)

func (*ParticipantInit) ToStartSession added in v0.15.7

func (pi *ParticipantInit) ToStartSession(roomName livekit.RoomName, connectionID livekit.ConnectionID) (*livekit.StartSession, error)

type RedisRouter

type RedisRouter struct {
	*LocalRouter
	// contains filtered or unexported fields
}

RedisRouter uses Redis pub/sub to route signaling messages across different nodes It relies on the RTC node to be the primary driver of the participant connection. Because

func NewRedisRouter

func NewRedisRouter(lr *LocalRouter, rc redis.UniversalClient, kps rpc.KeepalivePubSub) *RedisRouter

func (*RedisRouter) ClearRoomState

func (r *RedisRouter) ClearRoomState(_ context.Context, roomName livekit.RoomName) error

func (*RedisRouter) Drain added in v0.13.5

func (r *RedisRouter) Drain()

func (*RedisRouter) GetNode

func (r *RedisRouter) GetNode(nodeID livekit.NodeID) (*livekit.Node, error)

func (*RedisRouter) GetNodeForRoom

func (r *RedisRouter) GetNodeForRoom(_ context.Context, roomName livekit.RoomName) (*livekit.Node, error)

func (*RedisRouter) ListNodes

func (r *RedisRouter) ListNodes() ([]*livekit.Node, error)

func (*RedisRouter) RegisterNode

func (r *RedisRouter) RegisterNode() error

func (*RedisRouter) RemoveDeadNodes

func (r *RedisRouter) RemoveDeadNodes() error

func (*RedisRouter) SetNodeForRoom

func (r *RedisRouter) SetNodeForRoom(_ context.Context, roomName livekit.RoomName, nodeID livekit.NodeID) error

func (*RedisRouter) Start

func (r *RedisRouter) Start() error

func (*RedisRouter) StartParticipantSignal

func (r *RedisRouter) StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit) (res StartParticipantSignalResults, err error)

StartParticipantSignal signal connection sets up paths to the RTC node, and starts to route messages to that message queue

func (*RedisRouter) Stop

func (r *RedisRouter) Stop()

func (*RedisRouter) UnregisterNode

func (r *RedisRouter) UnregisterNode() error

type RelaySignalMessage added in v1.4.2

type RelaySignalMessage interface {
	proto.Message
	GetSeq() uint64
	GetClose() bool
}

type Router

type Router interface {
	MessageRouter

	RegisterNode() error
	UnregisterNode() error
	RemoveDeadNodes() error

	ListNodes() ([]*livekit.Node, error)

	GetNodeForRoom(ctx context.Context, roomName livekit.RoomName) (*livekit.Node, error)
	SetNodeForRoom(ctx context.Context, roomName livekit.RoomName, nodeId livekit.NodeID) error
	ClearRoomState(ctx context.Context, roomName livekit.RoomName) error

	GetRegion() string

	Start() error
	Drain()
	Stop()
}

Router allows multiple nodes to coordinate the participant session

func CreateRouter added in v0.13.5

func CreateRouter(rc redis.UniversalClient, node LocalNode, signalClient SignalClient, kps rpc.KeepalivePubSub) Router

type SignalClient added in v1.4.0

type SignalClient interface {
	ActiveCount() int
	StartParticipantSignal(ctx context.Context, roomName livekit.RoomName, pi ParticipantInit, nodeID livekit.NodeID) (connectionID livekit.ConnectionID, reqSink MessageSink, resSource MessageSource, err error)
}

func NewSignalClient added in v1.4.0

func NewSignalClient(nodeID livekit.NodeID, bus psrpc.MessageBus, config config.SignalRelayConfig) (SignalClient, error)

type SignalMessageReader added in v1.4.2

type SignalMessageReader[RecvType RelaySignalMessage] interface {
	Read(msg RecvType) ([]proto.Message, error)
}

type SignalMessageWriter added in v1.4.2

type SignalMessageWriter[SendType RelaySignalMessage] interface {
	Write(seq uint64, close bool, msgs []proto.Message) SendType
}

type SignalSinkParams added in v1.4.2

type SignalSinkParams[SendType, RecvType RelaySignalMessage] struct {
	Stream         psrpc.Stream[SendType, RecvType]
	Logger         logger.Logger
	Config         config.SignalRelayConfig
	Writer         SignalMessageWriter[SendType]
	CloseOnFailure bool
	BlockOnClose   bool
	ConnectionID   livekit.ConnectionID
}

type StartParticipantSignalResults added in v1.5.2

type StartParticipantSignalResults struct {
	ConnectionID        livekit.ConnectionID
	RequestSink         MessageSink
	ResponseSource      MessageSource
	NodeID              livekit.NodeID
	NodeSelectionReason string
}

Directories

Path Synopsis
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL