Documentation ¶
Index ¶
- Constants
- Variables
- func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)
- func Message2Chunks(message []byte, header *base.RtmpHeader) []byte
- type Buffer
- type ChunkComposer
- type ChunkDivider
- type ClientSession
- func (s *ClientSession) AppName() string
- func (s *ClientSession) Dispose() error
- func (s *ClientSession) Do(rawUrl string) error
- func (s *ClientSession) Flush() error
- func (s *ClientSession) GetStat() base.StatSession
- func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)
- func (s *ClientSession) RawQuery() string
- func (s *ClientSession) StreamName() string
- func (s *ClientSession) UniqueKey() string
- func (s *ClientSession) UpdateStat(intervalSec uint32)
- func (s *ClientSession) Url() string
- func (s *ClientSession) WaitChan() <-chan error
- func (s *ClientSession) Write(msg []byte) error
- type ClientSessionOption
- type ClientSessionType
- type HandshakeClientComplex
- type HandshakeClientSimple
- type HandshakeServer
- type IHandshakeClient
- type MessagePacker
- type ModClientSessionOption
- type ModPullSessionOption
- type ModPushSessionOption
- type ObjectPair
- type ObjectPairArray
- type OnCompleteMessage
- type OnReadRtmpAvMsg
- type PubSessionObserver
- type PullSession
- func (s *PullSession) AppName() string
- func (s *PullSession) Dispose() error
- func (s *PullSession) GetStat() base.StatSession
- func (s *PullSession) IsAlive() (readAlive, writeAlive bool)
- func (s *PullSession) Pull(rawUrl string, onReadRtmpAvMsg OnReadRtmpAvMsg) error
- func (s *PullSession) RawQuery() string
- func (s *PullSession) StreamName() string
- func (s *PullSession) UniqueKey() string
- func (s *PullSession) UpdateStat(intervalSec uint32)
- func (s *PullSession) Url() string
- func (s *PullSession) WaitChan() <-chan error
- type PullSessionOption
- type PushSession
- func (s *PushSession) AppName() string
- func (s *PushSession) Dispose() error
- func (s *PushSession) Flush() error
- func (s *PushSession) GetStat() base.StatSession
- func (s *PushSession) IsAlive() (readAlive, writeAlive bool)
- func (s *PushSession) Push(rawUrl string) error
- func (s *PushSession) RawQuery() string
- func (s *PushSession) StreamName() string
- func (s *PushSession) UniqueKey() string
- func (s *PushSession) UpdateStat(intervalSec uint32)
- func (s *PushSession) Url() string
- func (s *PushSession) WaitChan() <-chan error
- func (s *PushSession) Write(msg []byte) error
- type PushSessionOption
- type Server
- func (server *Server) Dispose()
- func (server *Server) Listen() (err error)
- func (server *Server) OnNewRtmpPubSession(session *ServerSession)
- func (server *Server) OnNewRtmpSubSession(session *ServerSession)
- func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
- func (server *Server) RunLoop() error
- type ServerObserver
- type ServerSession
- func (s *ServerSession) AppName() string
- func (s *ServerSession) Dispose() error
- func (s *ServerSession) Flush() error
- func (s *ServerSession) GetStat() base.StatSession
- func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)
- func (s *ServerSession) RawQuery() string
- func (s *ServerSession) RunLoop() (err error)
- func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)
- func (s *ServerSession) StreamName() string
- func (s *ServerSession) UniqueKey() string
- func (s *ServerSession) UpdateStat(intervalSec uint32)
- func (s *ServerSession) Url() string
- func (s *ServerSession) Write(msg []byte) error
- type ServerSessionObserver
- type ServerSessionType
- type Stream
- type StreamMsg
Constants ¶
const ( Amf0TypeMarkerNumber = uint8(0x00) Amf0TypeMarkerBoolean = uint8(0x01) Amf0TypeMarkerString = uint8(0x02) Amf0TypeMarkerObject = uint8(0x03) Amf0TypeMarkerNull = uint8(0x05) Amf0TypeMarkerEcmaArray = uint8(0x08) Amf0TypeMarkerObjectEnd = uint8(0x09) Amf0TypeMarkerLongString = uint8(0x0c) )
const ( CsidAmf = 5 CsidAudio = 6 CsidVideo = 7 )
const ( //MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令 Msid1 = 1 // publish、play、onStatus 以及 音视频数据 )
Variables ¶
var ( ErrAmfInvalidType = errors.New("lal.rtmp: invalid amf0 type") ErrAmfTooShort = errors.New("lal.rtmp: too short to unmarshal amf0 data") ErrAmfNotExist = errors.New("lal.rtmp: not exist") )
var Amf0 amf0
var Amf0TypeMarkerObjectEndBytes = []byte{0, 0, Amf0TypeMarkerObjectEnd}
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
var (
ErrRtmp = errors.New("lal.rtmp: fxxk")
)
var ( // 本端(包括Server Session和Client Session)设置的chunk size,本端发送数据时切割chunk包时使用 // (对端发送数据时的chunk size由对端决定,和本变量没有关系) // // 注意,这个值不应该设置的太小,原因有两方面: // 1. 性能与带宽 // 切割的chunk包过多,会消耗更多的CPU资源(包括本地和远端),另外还可能增加传输时的chunk header带宽消耗 // 2. 兼容性 // 理论上,信令也要参考chunk size切割成chunk包,而对端使用chunk包合成message的实现不一定标准。 // 我就遇到过这样的case,对端认为rtmp握手后的几个信令,每个信令都只使用一个chunk。 // 假如我们将一条信令切割成多个chunk,对端可能就解析错误了,这属于对端实现的问题。 // 但为了更好的兼容性,我们不要将chunk size设置的太小。 // LocalChunkSize = 4096 )
TODO chef 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口
Functions ¶
func BuildMetadata ¶
spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes
目前包含的字段: - width - height - audiocodecid - videocodecid - version
width 如果为-1,则metadata中不写入该字段 height 如果为-1,则metadata中不写入该字段 audiocodecid 如果为-1,则metadata中不写入该字段
AAC 10
videocodecid 如果为-1,则metadata中不写入该字段
H264 7 H265 12
@return 返回的内存块为新申请的独立内存块
func Message2Chunks ¶
func Message2Chunks(message []byte, header *base.RtmpHeader) []byte
@return 返回的内存块由内部申请,不依赖参数<message>内存块
Types ¶
type Buffer ¶ added in v0.24.1
type Buffer struct {
// contains filtered or unexported fields
}
func (*Buffer) ModWritePos ¶ added in v0.24.1
type ChunkComposer ¶
type ChunkComposer struct {
// contains filtered or unexported fields
}
func NewChunkComposer ¶
func NewChunkComposer() *ChunkComposer
func (*ChunkComposer) RunLoop ¶
func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error
RunLoop 将rtmp chunk合并为message
@param cb: stream.msg: 注意,回调结束后,`msg`的内存块会被`ChunkComposer`重复使用
也即多次回调的`msg`是复用的同一块内存块 如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝 只在回调中使用`msg`,则不需要拷贝 cb return: 如果cb返回的error不为nil,则`RunLoop`停止阻塞,并返回这个错误
@return 阻塞直到发生错误
TODO chef: msglen支持最大阈值,超过可以认为对端是非法的
func (*ChunkComposer) SetPeerChunkSize ¶
func (c *ChunkComposer) SetPeerChunkSize(val uint32)
type ChunkDivider ¶
type ChunkDivider struct {
// contains filtered or unexported fields
}
func (*ChunkDivider) Message2Chunks ¶
func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte
TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message
type ClientSession ¶
type ClientSession struct {
// contains filtered or unexported fields
}
rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession
func NewClientSession ¶
func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession
t: session的类型,只能是推或者拉
func (*ClientSession) AppName ¶
func (s *ClientSession) AppName() string
func (*ClientSession) Dispose ¶
func (s *ClientSession) Dispose() error
Dispose 文档请参考: IClientSessionLifecycle interface
func (*ClientSession) Do ¶
func (s *ClientSession) Do(rawUrl string) error
阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (*ClientSession) Flush ¶
func (s *ClientSession) Flush() error
func (*ClientSession) GetStat ¶
func (s *ClientSession) GetStat() base.StatSession
func (*ClientSession) IsAlive ¶
func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)
func (*ClientSession) RawQuery ¶
func (s *ClientSession) RawQuery() string
func (*ClientSession) StreamName ¶
func (s *ClientSession) StreamName() string
func (*ClientSession) UniqueKey ¶
func (s *ClientSession) UniqueKey() string
func (*ClientSession) UpdateStat ¶
func (s *ClientSession) UpdateStat(intervalSec uint32)
func (*ClientSession) Url ¶ added in v0.24.1
func (s *ClientSession) Url() string
func (*ClientSession) WaitChan ¶ added in v0.19.12
func (s *ClientSession) WaitChan() <-chan error
WaitChan 文档请参考: IClientSessionLifecycle interface
func (*ClientSession) Write ¶ added in v0.19.12
func (s *ClientSession) Write(msg []byte) error
type ClientSessionOption ¶
type ClientSessionType ¶
type ClientSessionType int
const ( CstPullSession ClientSessionType = iota CstPushSession )
type HandshakeClientComplex ¶
type HandshakeClientComplex struct {
// contains filtered or unexported fields
}
func (*HandshakeClientComplex) ReadS0S1 ¶ added in v0.19.12
func (c *HandshakeClientComplex) ReadS0S1(reader io.Reader) error
func (*HandshakeClientComplex) ReadS2 ¶ added in v0.19.12
func (c *HandshakeClientComplex) ReadS2(reader io.Reader) error
type HandshakeClientSimple ¶
type HandshakeClientSimple struct {
// contains filtered or unexported fields
}
func (*HandshakeClientSimple) ReadS0S1 ¶ added in v0.19.12
func (c *HandshakeClientSimple) ReadS0S1(reader io.Reader) error
func (*HandshakeClientSimple) ReadS2 ¶ added in v0.19.12
func (c *HandshakeClientSimple) ReadS2(reader io.Reader) error
type HandshakeServer ¶
type HandshakeServer struct {
// contains filtered or unexported fields
}
func (*HandshakeServer) WriteS0S1S2 ¶
func (s *HandshakeServer) WriteS0S1S2(writer io.Writer) error
type IHandshakeClient ¶ added in v0.19.12
type MessagePacker ¶
type MessagePacker struct {
// contains filtered or unexported fields
}
打包并发送 rtmp 信令
func NewMessagePacker ¶
func NewMessagePacker() *MessagePacker
func (*MessagePacker) ChunkAndWrite ¶ added in v0.24.1
type ModClientSessionOption ¶
type ModClientSessionOption func(option *ClientSessionOption)
type ModPullSessionOption ¶
type ModPullSessionOption func(option *PullSessionOption)
type ModPushSessionOption ¶
type ModPushSessionOption func(option *PushSessionOption)
type ObjectPair ¶
type ObjectPair struct { Key string Value interface{} }
type ObjectPairArray ¶
type ObjectPairArray []ObjectPair
func ParseMetadata ¶
func ParseMetadata(b []byte) (ObjectPairArray, error)
func (ObjectPairArray) Find ¶
func (o ObjectPairArray) Find(key string) interface{}
func (ObjectPairArray) FindNumber ¶
func (o ObjectPairArray) FindNumber(key string) (int, error)
func (ObjectPairArray) FindString ¶
func (o ObjectPairArray) FindString(key string) (string, error)
type OnCompleteMessage ¶
type OnReadRtmpAvMsg ¶ added in v0.24.1
type PubSessionObserver ¶
type PullSession ¶
type PullSession struct {
// contains filtered or unexported fields
}
func NewPullSession ¶
func NewPullSession(modOptions ...ModPullSessionOption) *PullSession
func (*PullSession) AppName ¶
func (s *PullSession) AppName() string
文档请参考: interface ISessionUrlContext
func (*PullSession) Dispose ¶
func (s *PullSession) Dispose() error
Dispose 文档请参考: IClientSessionLifecycle interface
func (*PullSession) GetStat ¶
func (s *PullSession) GetStat() base.StatSession
文档请参考: interface ISessionStat
func (*PullSession) IsAlive ¶
func (s *PullSession) IsAlive() (readAlive, writeAlive bool)
文档请参考: interface ISessionStat
func (*PullSession) Pull ¶
func (s *PullSession) Pull(rawUrl string, onReadRtmpAvMsg OnReadRtmpAvMsg) error
阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误
@param onReadRtmpAvMsg: msg: 注意,回调结束后,`msg`的内存块会被`PullSession`重复使用
也即多次回调的`msg`是复用的同一块内存块 如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝,比如调用`msg.Clone()` 只在回调中使用`msg`,则不需要拷贝
func (*PullSession) RawQuery ¶
func (s *PullSession) RawQuery() string
文档请参考: interface ISessionUrlContext
func (*PullSession) StreamName ¶
func (s *PullSession) StreamName() string
文档请参考: interface ISessionUrlContext
func (*PullSession) UpdateStat ¶
func (s *PullSession) UpdateStat(intervalSec uint32)
文档请参考: interface ISessionStat
func (*PullSession) Url ¶ added in v0.24.1
func (s *PullSession) Url() string
文档请参考: interface ISessionUrlContext
func (*PullSession) WaitChan ¶ added in v0.19.12
func (s *PullSession) WaitChan() <-chan error
WaitChan 文档请参考: IClientSessionLifecycle interface
type PullSessionOption ¶
type PushSession ¶
type PushSession struct { IsFresh bool // contains filtered or unexported fields }
func NewPushSession ¶
func NewPushSession(modOptions ...ModPushSessionOption) *PushSession
func (*PushSession) AppName ¶
func (s *PushSession) AppName() string
文档请参考: interface ISessionUrlContext
func (*PushSession) Dispose ¶
func (s *PushSession) Dispose() error
Dispose 文档请参考: IClientSessionLifecycle interface
func (*PushSession) GetStat ¶
func (s *PushSession) GetStat() base.StatSession
文档请参考: interface ISessionStat
func (*PushSession) IsAlive ¶
func (s *PushSession) IsAlive() (readAlive, writeAlive bool)
文档请参考: interface ISessionStat
func (*PushSession) Push ¶
func (s *PushSession) Push(rawUrl string) error
阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误
func (*PushSession) RawQuery ¶
func (s *PushSession) RawQuery() string
文档请参考: interface ISessionUrlContext
func (*PushSession) StreamName ¶
func (s *PushSession) StreamName() string
文档请参考: interface ISessionUrlContext
func (*PushSession) UpdateStat ¶
func (s *PushSession) UpdateStat(intervalSec uint32)
文档请参考: interface ISessionStat
func (*PushSession) Url ¶ added in v0.24.1
func (s *PushSession) Url() string
文档请参考: interface ISessionUrlContext
func (*PushSession) WaitChan ¶ added in v0.19.12
func (s *PushSession) WaitChan() <-chan error
WaitChan 文档请参考: IClientSessionLifecycle interface
func (*PushSession) Write ¶ added in v0.19.12
func (s *PushSession) Write(msg []byte) error
发送数据 注意,业务方需将数据打包成rtmp chunk格式后,再调用该函数发送
type PushSessionOption ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(observer ServerObserver, addr string) *Server
func (*Server) OnNewRtmpPubSession ¶ added in v0.24.1
func (server *Server) OnNewRtmpPubSession(session *ServerSession)
ServerSessionObserver
func (*Server) OnNewRtmpSubSession ¶ added in v0.24.1
func (server *Server) OnNewRtmpSubSession(session *ServerSession)
ServerSessionObserver
func (*Server) OnRtmpConnect ¶ added in v0.24.1
func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
ServerSessionObserver
type ServerObserver ¶
type ServerObserver interface { OnRtmpConnect(session *ServerSession, opa ObjectPairArray) OnNewRtmpPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接 OnDelRtmpPubSession(session *ServerSession) OnNewRtmpSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接 OnDelRtmpSubSession(session *ServerSession) }
type ServerSession ¶
type ServerSession struct { // only for SubSession IsFresh bool ShouldWaitVideoKeyFrame bool // contains filtered or unexported fields }
func NewServerSession ¶
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession
func (*ServerSession) AppName ¶
func (s *ServerSession) AppName() string
func (*ServerSession) Dispose ¶
func (s *ServerSession) Dispose() error
func (*ServerSession) Flush ¶
func (s *ServerSession) Flush() error
func (*ServerSession) GetStat ¶
func (s *ServerSession) GetStat() base.StatSession
func (*ServerSession) IsAlive ¶
func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)
func (*ServerSession) RawQuery ¶
func (s *ServerSession) RawQuery() string
func (*ServerSession) RunLoop ¶
func (s *ServerSession) RunLoop() (err error)
func (*ServerSession) SetPubSessionObserver ¶
func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)
func (*ServerSession) StreamName ¶
func (s *ServerSession) StreamName() string
func (*ServerSession) UniqueKey ¶
func (s *ServerSession) UniqueKey() string
func (*ServerSession) UpdateStat ¶
func (s *ServerSession) UpdateStat(intervalSec uint32)
func (*ServerSession) Url ¶ added in v0.24.1
func (s *ServerSession) Url() string
func (*ServerSession) Write ¶ added in v0.19.12
func (s *ServerSession) Write(msg []byte) error
type ServerSessionObserver ¶
type ServerSessionObserver interface { OnRtmpConnect(session *ServerSession, opa ObjectPairArray) OnNewRtmpPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听 OnNewRtmpSubSession(session *ServerSession) }
type ServerSessionType ¶
type ServerSessionType int
const ( ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态 ServerSessionTypePub ServerSessionTypeSub )