Documentation ¶
Index ¶
- Constants
- Variables
- func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)
- func Message2Chunks(message []byte, header *base.RTMPHeader) []byte
- type ChunkComposer
- type ChunkDivider
- type ClientSession
- func (s *ClientSession) AppName() string
- func (s *ClientSession) AsyncWrite(msg []byte) error
- func (s *ClientSession) Dispose()
- func (s *ClientSession) Do(rawURL string) error
- func (s *ClientSession) Flush() error
- func (s *ClientSession) GetStat() base.StatSession
- func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)
- func (s *ClientSession) RawQuery() string
- func (s *ClientSession) StreamName() string
- func (s *ClientSession) UpdateStat(interval uint32)
- func (s *ClientSession) Wait() <-chan error
- type ClientSessionOption
- type ClientSessionType
- type HandshakeClient
- type HandshakeClientComplex
- type HandshakeClientSimple
- type HandshakeServer
- type MessagePacker
- type ModClientSessionOption
- type ModPullSessionOption
- type ModPushSessionOption
- type ObjectPair
- type ObjectPairArray
- type OnCompleteMessage
- type OnReadRTMPAVMsg
- type PubSessionObserver
- type PullSession
- func (s *PullSession) AppName() string
- func (s *PullSession) Dispose()
- func (s *PullSession) GetStat() base.StatSession
- func (s *PullSession) IsAlive() (readAlive, writeAlive bool)
- func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error
- func (s *PullSession) RawQuery() string
- func (s *PullSession) StreamName() string
- func (s *PullSession) UniqueKey() string
- func (s *PullSession) UpdateStat(interval uint32)
- func (s *PullSession) Wait() <-chan error
- type PullSessionOption
- type PushSession
- func (s *PushSession) AppName() string
- func (s *PushSession) AsyncWrite(msg []byte) error
- func (s *PushSession) Dispose()
- func (s *PushSession) Flush() error
- func (s *PushSession) GetStat() base.StatSession
- func (s *PushSession) IsAlive() (readAlive, writeAlive bool)
- func (s *PushSession) Push(rawURL string) error
- func (s *PushSession) RawQuery() string
- func (s *PushSession) StreamName() string
- func (s *PushSession) UniqueKey() string
- func (s *PushSession) UpdateStat(interval uint32)
- func (s *PushSession) Wait() <-chan error
- type PushSessionOption
- type Server
- func (server *Server) Dispose()
- func (server *Server) Listen() (err error)
- func (server *Server) OnNewRTMPPubSession(session *ServerSession)
- func (server *Server) OnNewRTMPSubSession(session *ServerSession)
- func (server *Server) OnRTMPConnect(session *ServerSession, opa ObjectPairArray)
- func (server *Server) RunLoop() error
- type ServerObserver
- type ServerSession
- func (s *ServerSession) AppName() string
- func (s *ServerSession) AsyncWrite(msg []byte) error
- func (s *ServerSession) Dispose()
- func (s *ServerSession) Flush() error
- func (s *ServerSession) GetStat() base.StatSession
- func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)
- func (s *ServerSession) RawQuery() string
- func (s *ServerSession) RemoteAddr() string
- func (s *ServerSession) RunLoop() (err error)
- func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)
- func (s *ServerSession) StreamName() string
- func (s *ServerSession) URL() string
- func (s *ServerSession) UpdateStat(interval uint32)
- type ServerSessionObserver
- type ServerSessionType
- type Stream
- type StreamMsg
Constants ¶
const ( AMF0TypeMarkerNumber = uint8(0x00) AMF0TypeMarkerBoolean = uint8(0x01) AMF0TypeMarkerString = uint8(0x02) AMF0TypeMarkerObject = uint8(0x03) AMF0TypeMarkerNull = uint8(0x05) AMF0TypeMarkerEcmaArray = uint8(0x08) AMF0TypeMarkerObjectEnd = uint8(0x09) AMF0TypeMarkerLongString = uint8(0x0c) )
const ( CSIDAMF = 5 CSIDAudio = 6 CSIDVideo = 7 )
const ( //MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令 MSID1 = 1 // publish、play、onStatus 以及 音视频数据 )
Variables ¶
var ( ErrAMFInvalidType = errors.New("lal.rtmp: invalid amf0 type") ErrAMFTooShort = errors.New("lal.rtmp: too short to unmarshal amf0 data") ErrAMFNotExist = errors.New("lal.rtmp: not exist") )
var AMF0 amf0
var AMF0TypeMarkerObjectEndBytes = []byte{0, 0, AMF0TypeMarkerObjectEnd}
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
var ErrRTMP = errors.New("lal.rtmp: fxxk")
var (
LocalChunkSize = 4096 // 本端设置的 chunk size
)
TODO chef 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口
Functions ¶
func BuildMetadata ¶
spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes
目前包含的字段: - width - height - audiocodecid - videocodecid - version
width 如果为-1,则metadata中不写入该字段 height 如果为-1,则metadata中不写入该字段 audiocodecid 如果为-1,则metadata中不写入该字段
AAC 10
videocodecid 如果为-1,则metadata中不写入该字段
H264 7 H265 12
@return 返回的内存块为新申请的独立内存块
func Message2Chunks ¶
func Message2Chunks(message []byte, header *base.RTMPHeader) []byte
@return 返回的内存块由内部申请,不依赖参数<message>内存块
Types ¶
type ChunkComposer ¶
type ChunkComposer struct {
// contains filtered or unexported fields
}
func NewChunkComposer ¶
func NewChunkComposer() *ChunkComposer
func (*ChunkComposer) RunLoop ¶
func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error
cb 回调结束后,内存块会被 ChunkComposer 再次使用
func (*ChunkComposer) SetPeerChunkSize ¶
func (c *ChunkComposer) SetPeerChunkSize(val uint32)
type ChunkDivider ¶
type ChunkDivider struct {
// contains filtered or unexported fields
}
func (*ChunkDivider) Message2Chunks ¶
func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RTMPHeader) []byte
TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message
type ClientSession ¶
type ClientSession struct { UniqueKey string // contains filtered or unexported fields }
rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession
func NewClientSession ¶
func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession
t: session的类型,只能是推或者拉
func (*ClientSession) AppName ¶
func (s *ClientSession) AppName() string
func (*ClientSession) AsyncWrite ¶
func (s *ClientSession) AsyncWrite(msg []byte) error
func (*ClientSession) Dispose ¶
func (s *ClientSession) Dispose()
func (*ClientSession) Do ¶
func (s *ClientSession) Do(rawURL string) error
阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误
func (*ClientSession) Flush ¶
func (s *ClientSession) Flush() error
func (*ClientSession) GetStat ¶
func (s *ClientSession) GetStat() base.StatSession
func (*ClientSession) IsAlive ¶
func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)
func (*ClientSession) RawQuery ¶
func (s *ClientSession) RawQuery() string
func (*ClientSession) StreamName ¶
func (s *ClientSession) StreamName() string
func (*ClientSession) UpdateStat ¶
func (s *ClientSession) UpdateStat(interval uint32)
type ClientSessionOption ¶
type ClientSessionType ¶
type ClientSessionType int
const ( CSTPullSession ClientSessionType = iota CSTPushSession )
type HandshakeClient ¶
type HandshakeClientComplex ¶
type HandshakeClientComplex struct {
// contains filtered or unexported fields
}
func (*HandshakeClientComplex) ReadS0S1S2 ¶
func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error
type HandshakeClientSimple ¶
type HandshakeClientSimple struct {
// contains filtered or unexported fields
}
func (*HandshakeClientSimple) ReadS0S1S2 ¶
func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error
type HandshakeServer ¶
type HandshakeServer struct {
// contains filtered or unexported fields
}
func (*HandshakeServer) WriteS0S1S2 ¶
func (s *HandshakeServer) WriteS0S1S2(write io.Writer) error
type MessagePacker ¶
type MessagePacker struct {
// contains filtered or unexported fields
}
func NewMessagePacker ¶
func NewMessagePacker() *MessagePacker
type ModClientSessionOption ¶
type ModClientSessionOption func(option *ClientSessionOption)
type ModPullSessionOption ¶
type ModPullSessionOption func(option *PullSessionOption)
type ModPushSessionOption ¶
type ModPushSessionOption func(option *PushSessionOption)
type ObjectPair ¶
type ObjectPair struct { Key string Value interface{} }
type ObjectPairArray ¶
type ObjectPairArray []ObjectPair
func ParseMetadata ¶
func ParseMetadata(b []byte) (ObjectPairArray, error)
func (ObjectPairArray) Find ¶
func (o ObjectPairArray) Find(key string) interface{}
func (ObjectPairArray) FindNumber ¶
func (o ObjectPairArray) FindNumber(key string) (int, error)
func (ObjectPairArray) FindString ¶
func (o ObjectPairArray) FindString(key string) (string, error)
type OnCompleteMessage ¶
type OnReadRTMPAVMsg ¶
type PubSessionObserver ¶
type PullSession ¶
type PullSession struct {
// contains filtered or unexported fields
}
func NewPullSession ¶
func NewPullSession(modOptions ...ModPullSessionOption) *PullSession
func (*PullSession) AppName ¶
func (s *PullSession) AppName() string
func (*PullSession) Dispose ¶
func (s *PullSession) Dispose()
func (*PullSession) GetStat ¶
func (s *PullSession) GetStat() base.StatSession
func (*PullSession) IsAlive ¶
func (s *PullSession) IsAlive() (readAlive, writeAlive bool)
func (*PullSession) Pull ¶
func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error
如果没有发生错误,阻塞直到到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令
onReadRTMPAVMsg: 注意,回调结束后,内存块会被PullSession重复使用
func (*PullSession) RawQuery ¶
func (s *PullSession) RawQuery() string
func (*PullSession) StreamName ¶
func (s *PullSession) StreamName() string
func (*PullSession) UniqueKey ¶
func (s *PullSession) UniqueKey() string
func (*PullSession) UpdateStat ¶
func (s *PullSession) UpdateStat(interval uint32)
type PullSessionOption ¶
type PushSession ¶
type PushSession struct { IsFresh bool // contains filtered or unexported fields }
func NewPushSession ¶
func NewPushSession(modOptions ...ModPushSessionOption) *PushSession
func (*PushSession) AppName ¶
func (s *PushSession) AppName() string
func (*PushSession) AsyncWrite ¶
func (s *PushSession) AsyncWrite(msg []byte) error
func (*PushSession) Dispose ¶
func (s *PushSession) Dispose()
func (*PushSession) Flush ¶
func (s *PushSession) Flush() error
func (*PushSession) GetStat ¶
func (s *PushSession) GetStat() base.StatSession
func (*PushSession) IsAlive ¶
func (s *PushSession) IsAlive() (readAlive, writeAlive bool)
func (*PushSession) Push ¶
func (s *PushSession) Push(rawURL string) error
如果没有错误发生,阻塞到接收音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令
func (*PushSession) RawQuery ¶
func (s *PushSession) RawQuery() string
func (*PushSession) StreamName ¶
func (s *PushSession) StreamName() string
func (*PushSession) UniqueKey ¶
func (s *PushSession) UniqueKey() string
func (*PushSession) UpdateStat ¶
func (s *PushSession) UpdateStat(interval uint32)
func (*PushSession) Wait ¶
func (s *PushSession) Wait() <-chan error
type PushSessionOption ¶
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
func NewServer ¶
func NewServer(observer ServerObserver, addr string) *Server
func (*Server) OnNewRTMPPubSession ¶
func (server *Server) OnNewRTMPPubSession(session *ServerSession)
ServerSessionObserver
func (*Server) OnNewRTMPSubSession ¶
func (server *Server) OnNewRTMPSubSession(session *ServerSession)
ServerSessionObserver
func (*Server) OnRTMPConnect ¶
func (server *Server) OnRTMPConnect(session *ServerSession, opa ObjectPairArray)
ServerSessionObserver
type ServerObserver ¶
type ServerObserver interface { OnRTMPConnect(session *ServerSession, opa ObjectPairArray) OnNewRTMPPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接 OnDelRTMPPubSession(session *ServerSession) OnNewRTMPSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接 OnDelRTMPSubSession(session *ServerSession) }
type ServerSession ¶
type ServerSession struct { UniqueKey string // const after ctor // only for SubSession IsFresh bool // contains filtered or unexported fields }
func NewServerSession ¶
func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession
func (*ServerSession) AppName ¶
func (s *ServerSession) AppName() string
func (*ServerSession) AsyncWrite ¶
func (s *ServerSession) AsyncWrite(msg []byte) error
func (*ServerSession) Dispose ¶
func (s *ServerSession) Dispose()
func (*ServerSession) Flush ¶
func (s *ServerSession) Flush() error
func (*ServerSession) GetStat ¶
func (s *ServerSession) GetStat() base.StatSession
func (*ServerSession) IsAlive ¶
func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)
func (*ServerSession) RawQuery ¶
func (s *ServerSession) RawQuery() string
func (*ServerSession) RemoteAddr ¶
func (s *ServerSession) RemoteAddr() string
func (*ServerSession) RunLoop ¶
func (s *ServerSession) RunLoop() (err error)
func (*ServerSession) SetPubSessionObserver ¶
func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)
func (*ServerSession) StreamName ¶
func (s *ServerSession) StreamName() string
func (*ServerSession) URL ¶
func (s *ServerSession) URL() string
func (*ServerSession) UpdateStat ¶
func (s *ServerSession) UpdateStat(interval uint32)
type ServerSessionObserver ¶
type ServerSessionObserver interface { OnRTMPConnect(session *ServerSession, opa ObjectPairArray) OnNewRTMPPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听 OnNewRTMPSubSession(session *ServerSession) }
type ServerSessionType ¶
type ServerSessionType int
const ( ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态 ServerSessionTypePub ServerSessionTypeSub )