rtmp

package
v0.24.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2021 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Amf0TypeMarkerNumber     = uint8(0x00)
	Amf0TypeMarkerBoolean    = uint8(0x01)
	Amf0TypeMarkerString     = uint8(0x02)
	Amf0TypeMarkerObject     = uint8(0x03)
	Amf0TypeMarkerNull       = uint8(0x05)
	Amf0TypeMarkerEcmaArray  = uint8(0x08)
	Amf0TypeMarkerObjectEnd  = uint8(0x09)
	Amf0TypeMarkerLongString = uint8(0x0c)
)
View Source
const (
	CsidAmf   = 5
	CsidAudio = 6
	CsidVideo = 7
)
View Source
const (
	//MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令
	Msid1 = 1 // publish、play、onStatus 以及 音视频数据
)

Variables

View Source
var (
	ErrAmfInvalidType = errors.New("lal.rtmp: invalid amf0 type")
	ErrAmfTooShort    = errors.New("lal.rtmp: too short to unmarshal amf0 data")
	ErrAmfNotExist    = errors.New("lal.rtmp: not exist")
)
View Source
var Amf0 amf0
View Source
var Amf0TypeMarkerObjectEndBytes = []byte{0, 0, Amf0TypeMarkerObjectEnd}
View Source
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
View Source
var (
	ErrRtmp = errors.New("lal.rtmp: fxxk")
)
View Source
var (

	// 本端(包括Server Session和Client Session)设置的chunk size,本端发送数据时切割chunk包时使用
	// (对端发送数据时的chunk size由对端决定,和本变量没有关系)
	//
	// 注意,这个值不应该设置的太小,原因有两方面:
	// 1. 性能与带宽
	//    切割的chunk包过多,会消耗更多的CPU资源(包括本地和远端),另外还可能增加传输时的chunk header带宽消耗
	// 2. 兼容性
	//    理论上,信令也要参考chunk size切割成chunk包,而对端使用chunk包合成message的实现不一定标准。
	//    我就遇到过这样的case,对端认为rtmp握手后的几个信令,每个信令都只使用一个chunk。
	//    假如我们将一条信令切割成多个chunk,对端可能就解析错误了,这属于对端实现的问题。
	//    但为了更好的兼容性,我们不要将chunk size设置的太小。
	//
	LocalChunkSize = 4096
)

TODO chef 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口

Functions

func BuildMetadata

func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)

spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes

目前包含的字段: - width - height - audiocodecid - videocodecid - version

width 如果为-1,则metadata中不写入该字段 height 如果为-1,则metadata中不写入该字段 audiocodecid 如果为-1,则metadata中不写入该字段

AAC 10

videocodecid 如果为-1,则metadata中不写入该字段

H264 7
H265 12

@return 返回的内存块为新申请的独立内存块

func Message2Chunks

func Message2Chunks(message []byte, header *base.RtmpHeader) []byte

@return 返回的内存块由内部申请,不依赖参数<message>内存块

Types

type Buffer added in v0.24.1

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer added in v0.24.1

func NewBuffer(n int) *Buffer

func (*Buffer) Bytes added in v0.24.1

func (b *Buffer) Bytes() []byte

func (*Buffer) Len added in v0.24.1

func (b *Buffer) Len() int

func (*Buffer) ModWritePos added in v0.24.1

func (b *Buffer) ModWritePos(pos int)

func (*Buffer) Reset added in v0.24.1

func (b *Buffer) Reset()

func (*Buffer) Write added in v0.24.1

func (b *Buffer) Write(p []byte) (n int, err error)

func (*Buffer) WriteByte added in v0.24.1

func (b *Buffer) WriteByte(c byte) error

func (*Buffer) WriteTo added in v0.24.1

func (b *Buffer) WriteTo(w io.Writer) (n int64, err error)

type ChunkComposer

type ChunkComposer struct {
	// contains filtered or unexported fields
}

func NewChunkComposer

func NewChunkComposer() *ChunkComposer

func (*ChunkComposer) RunLoop

func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error

RunLoop 将rtmp chunk合并为message

@param cb: stream.msg: 注意,回调结束后,`msg`的内存块会被`ChunkComposer`重复使用

            也即多次回调的`msg`是复用的同一块内存块
            如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝
            只在回调中使用`msg`,则不需要拷贝

cb return:  如果cb返回的error不为nil,则`RunLoop`停止阻塞,并返回这个错误

@return 阻塞直到发生错误

TODO chef: msglen支持最大阈值,超过可以认为对端是非法的

func (*ChunkComposer) SetPeerChunkSize

func (c *ChunkComposer) SetPeerChunkSize(val uint32)

type ChunkDivider

type ChunkDivider struct {
	// contains filtered or unexported fields
}

func (*ChunkDivider) Message2Chunks

func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte

TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message

type ClientSession

type ClientSession struct {
	// contains filtered or unexported fields
}

rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession

func NewClientSession

func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession

t: session的类型,只能是推或者拉

func (*ClientSession) AppName

func (s *ClientSession) AppName() string

func (*ClientSession) Dispose

func (s *ClientSession) Dispose() error

Dispose 文档请参考: IClientSessionLifecycle interface

func (*ClientSession) Do

func (s *ClientSession) Do(rawUrl string) error

阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误

func (*ClientSession) Flush

func (s *ClientSession) Flush() error

func (*ClientSession) GetStat

func (s *ClientSession) GetStat() base.StatSession

func (*ClientSession) IsAlive

func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)

func (*ClientSession) RawQuery

func (s *ClientSession) RawQuery() string

func (*ClientSession) StreamName

func (s *ClientSession) StreamName() string

func (*ClientSession) UniqueKey

func (s *ClientSession) UniqueKey() string

func (*ClientSession) UpdateStat

func (s *ClientSession) UpdateStat(intervalSec uint32)

func (*ClientSession) Url added in v0.24.1

func (s *ClientSession) Url() string

func (*ClientSession) WaitChan added in v0.19.12

func (s *ClientSession) WaitChan() <-chan error

WaitChan 文档请参考: IClientSessionLifecycle interface

func (*ClientSession) Write added in v0.19.12

func (s *ClientSession) Write(msg []byte) error

type ClientSessionOption

type ClientSessionOption struct {
	// 单位毫秒,如果为0,则没有超时
	DoTimeoutMs          int  // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
	ReadAvTimeoutMs      int  // 读取音视频数据的超时
	WriteAvTimeoutMs     int  // 发送音视频数据的超时
	HandshakeComplexFlag bool // 握手是否使用复杂模式
}

type ClientSessionType

type ClientSessionType int
const (
	CstPullSession ClientSessionType = iota
	CstPushSession
)

type HandshakeClientComplex

type HandshakeClientComplex struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientComplex) ReadS0S1 added in v0.19.12

func (c *HandshakeClientComplex) ReadS0S1(reader io.Reader) error

func (*HandshakeClientComplex) ReadS2 added in v0.19.12

func (c *HandshakeClientComplex) ReadS2(reader io.Reader) error

func (*HandshakeClientComplex) WriteC0C1

func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error

func (*HandshakeClientComplex) WriteC2

func (c *HandshakeClientComplex) WriteC2(writer io.Writer) error

type HandshakeClientSimple

type HandshakeClientSimple struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientSimple) ReadS0S1 added in v0.19.12

func (c *HandshakeClientSimple) ReadS0S1(reader io.Reader) error

func (*HandshakeClientSimple) ReadS2 added in v0.19.12

func (c *HandshakeClientSimple) ReadS2(reader io.Reader) error

func (*HandshakeClientSimple) WriteC0C1

func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error

func (*HandshakeClientSimple) WriteC2

func (c *HandshakeClientSimple) WriteC2(writer io.Writer) error

type HandshakeServer

type HandshakeServer struct {
	// contains filtered or unexported fields
}

func (*HandshakeServer) ReadC0C1

func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error)

func (*HandshakeServer) ReadC2

func (s *HandshakeServer) ReadC2(reader io.Reader) error

func (*HandshakeServer) WriteS0S1S2

func (s *HandshakeServer) WriteS0S1S2(writer io.Writer) error

type IHandshakeClient added in v0.19.12

type IHandshakeClient interface {
	WriteC0C1(writer io.Writer) error
	ReadS0S1(reader io.Reader) error
	WriteC2(writer io.Writer) error
	ReadS2(reader io.Reader) error
}

type MessagePacker

type MessagePacker struct {
	// contains filtered or unexported fields
}

打包并发送 rtmp 信令

func NewMessagePacker

func NewMessagePacker() *MessagePacker

func (*MessagePacker) ChunkAndWrite added in v0.24.1

func (packer *MessagePacker) ChunkAndWrite(writer io.Writer, csid int, typeid uint8, streamid int) error

type ModClientSessionOption

type ModClientSessionOption func(option *ClientSessionOption)

type ModPullSessionOption

type ModPullSessionOption func(option *PullSessionOption)

type ModPushSessionOption

type ModPushSessionOption func(option *PushSessionOption)

type ObjectPair

type ObjectPair struct {
	Key   string
	Value interface{}
}

type ObjectPairArray

type ObjectPairArray []ObjectPair

func ParseMetadata

func ParseMetadata(b []byte) (ObjectPairArray, error)

func (ObjectPairArray) Find

func (o ObjectPairArray) Find(key string) interface{}

func (ObjectPairArray) FindNumber

func (o ObjectPairArray) FindNumber(key string) (int, error)

func (ObjectPairArray) FindString

func (o ObjectPairArray) FindString(key string) (string, error)

type OnCompleteMessage

type OnCompleteMessage func(stream *Stream) error

type OnReadRtmpAvMsg added in v0.24.1

type OnReadRtmpAvMsg func(msg base.RtmpMsg)

type PubSessionObserver

type PubSessionObserver interface {
	// 注意,回调结束后,内部会复用Payload内存块
	OnReadRtmpAvMsg(msg base.RtmpMsg)
}

type PullSession

type PullSession struct {
	// contains filtered or unexported fields
}

func NewPullSession

func NewPullSession(modOptions ...ModPullSessionOption) *PullSession

func (*PullSession) AppName

func (s *PullSession) AppName() string

文档请参考: interface ISessionUrlContext

func (*PullSession) Dispose

func (s *PullSession) Dispose() error

Dispose 文档请参考: IClientSessionLifecycle interface

func (*PullSession) GetStat

func (s *PullSession) GetStat() base.StatSession

文档请参考: interface ISessionStat

func (*PullSession) IsAlive

func (s *PullSession) IsAlive() (readAlive, writeAlive bool)

文档请参考: interface ISessionStat

func (*PullSession) Pull

func (s *PullSession) Pull(rawUrl string, onReadRtmpAvMsg OnReadRtmpAvMsg) error

阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误

@param onReadRtmpAvMsg: msg: 注意,回调结束后,`msg`的内存块会被`PullSession`重复使用

也即多次回调的`msg`是复用的同一块内存块
如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝,比如调用`msg.Clone()`
只在回调中使用`msg`,则不需要拷贝

func (*PullSession) RawQuery

func (s *PullSession) RawQuery() string

文档请参考: interface ISessionUrlContext

func (*PullSession) StreamName

func (s *PullSession) StreamName() string

文档请参考: interface ISessionUrlContext

func (*PullSession) UniqueKey

func (s *PullSession) UniqueKey() string

文档请参考: interface IObject

func (*PullSession) UpdateStat

func (s *PullSession) UpdateStat(intervalSec uint32)

文档请参考: interface ISessionStat

func (*PullSession) Url added in v0.24.1

func (s *PullSession) Url() string

文档请参考: interface ISessionUrlContext

func (*PullSession) WaitChan added in v0.19.12

func (s *PullSession) WaitChan() <-chan error

WaitChan 文档请参考: IClientSessionLifecycle interface

type PullSessionOption

type PullSessionOption struct {
	// 从调用Pull函数,到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PullTimeoutMs int

	ReadAvTimeoutMs      int
	HandshakeComplexFlag bool
}

type PushSession

type PushSession struct {
	IsFresh bool
	// contains filtered or unexported fields
}

func NewPushSession

func NewPushSession(modOptions ...ModPushSessionOption) *PushSession

func (*PushSession) AppName

func (s *PushSession) AppName() string

文档请参考: interface ISessionUrlContext

func (*PushSession) Dispose

func (s *PushSession) Dispose() error

Dispose 文档请参考: IClientSessionLifecycle interface

func (*PushSession) Flush

func (s *PushSession) Flush() error

将缓存的数据立即刷新发送 是否有缓存策略,请参见配置及内部实现

func (*PushSession) GetStat

func (s *PushSession) GetStat() base.StatSession

文档请参考: interface ISessionStat

func (*PushSession) IsAlive

func (s *PushSession) IsAlive() (readAlive, writeAlive bool)

文档请参考: interface ISessionStat

func (*PushSession) Push

func (s *PushSession) Push(rawUrl string) error

阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误

func (*PushSession) RawQuery

func (s *PushSession) RawQuery() string

文档请参考: interface ISessionUrlContext

func (*PushSession) StreamName

func (s *PushSession) StreamName() string

文档请参考: interface ISessionUrlContext

func (*PushSession) UniqueKey

func (s *PushSession) UniqueKey() string

文档请参考: interface IObject

func (*PushSession) UpdateStat

func (s *PushSession) UpdateStat(intervalSec uint32)

文档请参考: interface ISessionStat

func (*PushSession) Url added in v0.24.1

func (s *PushSession) Url() string

文档请参考: interface ISessionUrlContext

func (*PushSession) WaitChan added in v0.19.12

func (s *PushSession) WaitChan() <-chan error

WaitChan 文档请参考: IClientSessionLifecycle interface

func (*PushSession) Write added in v0.19.12

func (s *PushSession) Write(msg []byte) error

发送数据 注意,业务方需将数据打包成rtmp chunk格式后,再调用该函数发送

type PushSessionOption

type PushSessionOption struct {
	// 从调用Push函数,到可以发送音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PushTimeoutMs int

	WriteAvTimeoutMs     int
	HandshakeComplexFlag bool
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(observer ServerObserver, addr string) *Server

func (*Server) Dispose

func (server *Server) Dispose()

func (*Server) Listen

func (server *Server) Listen() (err error)

func (*Server) OnNewRtmpPubSession added in v0.24.1

func (server *Server) OnNewRtmpPubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnNewRtmpSubSession added in v0.24.1

func (server *Server) OnNewRtmpSubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnRtmpConnect added in v0.24.1

func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray)

ServerSessionObserver

func (*Server) RunLoop

func (server *Server) RunLoop() error

type ServerObserver

type ServerObserver interface {
	OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
	OnNewRtmpPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接
	OnDelRtmpPubSession(session *ServerSession)
	OnNewRtmpSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接
	OnDelRtmpSubSession(session *ServerSession)
}

type ServerSession

type ServerSession struct {

	// only for SubSession
	IsFresh                 bool
	ShouldWaitVideoKeyFrame bool
	// contains filtered or unexported fields
}

func NewServerSession

func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession

func (*ServerSession) AppName

func (s *ServerSession) AppName() string

func (*ServerSession) Dispose

func (s *ServerSession) Dispose() error

func (*ServerSession) Flush

func (s *ServerSession) Flush() error

func (*ServerSession) GetStat

func (s *ServerSession) GetStat() base.StatSession

func (*ServerSession) IsAlive

func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)

func (*ServerSession) RawQuery

func (s *ServerSession) RawQuery() string

func (*ServerSession) RunLoop

func (s *ServerSession) RunLoop() (err error)

func (*ServerSession) SetPubSessionObserver

func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)

func (*ServerSession) StreamName

func (s *ServerSession) StreamName() string

func (*ServerSession) UniqueKey

func (s *ServerSession) UniqueKey() string

func (*ServerSession) UpdateStat

func (s *ServerSession) UpdateStat(intervalSec uint32)

func (*ServerSession) Url added in v0.24.1

func (s *ServerSession) Url() string

func (*ServerSession) Write added in v0.19.12

func (s *ServerSession) Write(msg []byte) error

type ServerSessionObserver

type ServerSessionObserver interface {
	OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
	OnNewRtmpPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
	OnNewRtmpSubSession(session *ServerSession)
}

type ServerSessionType

type ServerSessionType int
const (
	ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态
	ServerSessionTypePub
	ServerSessionTypeSub
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream() *Stream

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

TODO chef: 将这个buffer实现和bytes.Buffer做比较,考虑将它放入naza package中

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL