rtmp

package
v0.24.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2021 License: MIT Imports: 17 Imported by: 8

Documentation

Index

Constants

View Source
const (
	Amf0TypeMarkerNumber     = uint8(0x00)
	Amf0TypeMarkerBoolean    = uint8(0x01)
	Amf0TypeMarkerString     = uint8(0x02)
	Amf0TypeMarkerObject     = uint8(0x03)
	Amf0TypeMarkerNull       = uint8(0x05)
	Amf0TypeMarkerEcmaArray  = uint8(0x08)
	Amf0TypeMarkerObjectEnd  = uint8(0x09)
	Amf0TypeMarkerLongString = uint8(0x0c)
)
View Source
const (
	CsidAmf   = 5
	CsidAudio = 6
	CsidVideo = 7
)
View Source
const (
	//MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令
	Msid1 = 1 // publish、play、onStatus 以及 音视频数据
)

Variables

View Source
var (
	ErrAmfInvalidType = errors.New("lal.rtmp: invalid amf0 type")
	ErrAmfTooShort    = errors.New("lal.rtmp: too short to unmarshal amf0 data")
	ErrAmfNotExist    = errors.New("lal.rtmp: not exist")
)
View Source
var Amf0 amf0
View Source
var Amf0TypeMarkerObjectEndBytes = []byte{0, 0, Amf0TypeMarkerObjectEnd}
View Source
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
View Source
var (
	ErrRtmp = errors.New("lal.rtmp: fxxk")
)
View Source
var (

	// 本端(包括Server Session和Client Session)设置的chunk size,本端发送数据时切割chunk包时使用
	// (对端发送数据时的chunk size由对端决定,和本变量没有关系)
	//
	// 注意,这个值不应该设置的太小,原因有两方面:
	// 1. 性能与带宽
	//    切割的chunk包过多,会消耗更多的CPU资源(包括本地和远端),另外还可能增加传输时的chunk header带宽消耗
	// 2. 兼容性
	//    理论上,信令也要参考chunk size切割成chunk包,而对端使用chunk包合成message的实现不一定标准。
	//    我就遇到过这样的case,对端认为rtmp握手后的几个信令,每个信令都只使用一个chunk。
	//    假如我们将一条信令切割成多个chunk,对端可能就解析错误了,这属于对端实现的问题。
	//    但为了更好的兼容性,我们不要将chunk size设置的太小。
	//
	LocalChunkSize = 4096
)

TODO chef 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口

Functions

func BuildMetadata added in v0.14.0

func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)

spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes

目前包含的字段: - width - height - audiocodecid - videocodecid - version

@param width 如果为-1,则metadata中不写入该字段 @param height 如果为-1,则metadata中不写入该字段 @param audiocodecid 如果为-1,则metadata中不写入该字段

AAC 10

@param videocodecid 如果为-1,则metadata中不写入该字段

H264 7
H265 12

@return 返回的内存块为新申请的独立内存块

func Message2Chunks

func Message2Chunks(message []byte, header *base.RtmpHeader) []byte

@return 返回的内存块由内部申请,不依赖参数<message>内存块

Types

type Buffer added in v0.24.0

type Buffer struct {
	// contains filtered or unexported fields
}

func NewBuffer added in v0.24.0

func NewBuffer(n int) *Buffer

func (*Buffer) Bytes added in v0.24.0

func (b *Buffer) Bytes() []byte

func (*Buffer) Len added in v0.24.0

func (b *Buffer) Len() int

func (*Buffer) ModWritePos added in v0.24.0

func (b *Buffer) ModWritePos(pos int)

func (*Buffer) Reset added in v0.24.0

func (b *Buffer) Reset()

func (*Buffer) Write added in v0.24.0

func (b *Buffer) Write(p []byte) (n int, err error)

func (*Buffer) WriteByte added in v0.24.0

func (b *Buffer) WriteByte(c byte) error

func (*Buffer) WriteTo added in v0.24.0

func (b *Buffer) WriteTo(w io.Writer) (n int64, err error)

type ChunkComposer

type ChunkComposer struct {
	// contains filtered or unexported fields
}

func NewChunkComposer

func NewChunkComposer() *ChunkComposer

func (*ChunkComposer) RunLoop

func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error

@param cb 回调结束后,内存块会被 ChunkComposer 再次使用

func (*ChunkComposer) SetPeerChunkSize

func (c *ChunkComposer) SetPeerChunkSize(val uint32)

type ChunkDivider added in v0.3.0

type ChunkDivider struct {
	// contains filtered or unexported fields
}

func (*ChunkDivider) Message2Chunks added in v0.3.0

func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RtmpHeader) []byte

TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message

type ClientSession

type ClientSession struct {
	// contains filtered or unexported fields
}

rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession

func NewClientSession

func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession

@param t: session的类型,只能是推或者拉

func (*ClientSession) AppName added in v0.18.0

func (s *ClientSession) AppName() string

func (*ClientSession) Dispose added in v0.3.0

func (s *ClientSession) Dispose() error

func (*ClientSession) Do

func (s *ClientSession) Do(rawUrl string) error

阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误

func (*ClientSession) Flush added in v0.3.0

func (s *ClientSession) Flush() error

func (*ClientSession) GetStat added in v0.17.0

func (s *ClientSession) GetStat() base.StatSession

func (*ClientSession) IsAlive added in v0.18.0

func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)

func (*ClientSession) RawQuery added in v0.18.0

func (s *ClientSession) RawQuery() string

func (*ClientSession) StreamName added in v0.18.0

func (s *ClientSession) StreamName() string

func (*ClientSession) UniqueKey

func (s *ClientSession) UniqueKey() string

func (*ClientSession) UpdateStat added in v0.17.0

func (s *ClientSession) UpdateStat(intervalSec uint32)

func (*ClientSession) Url added in v0.23.0

func (s *ClientSession) Url() string

func (*ClientSession) WaitChan added in v0.20.0

func (s *ClientSession) WaitChan() <-chan error

func (*ClientSession) Write added in v0.20.0

func (s *ClientSession) Write(msg []byte) error

type ClientSessionOption added in v0.5.0

type ClientSessionOption struct {
	// 单位毫秒,如果为0,则没有超时
	DoTimeoutMs          int  // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
	ReadAvTimeoutMs      int  // 读取音视频数据的超时
	WriteAvTimeoutMs     int  // 发送音视频数据的超时
	HandshakeComplexFlag bool // 握手是否使用复杂模式
}

type ClientSessionType

type ClientSessionType int
const (
	CstPullSession ClientSessionType = iota
	CstPushSession
)

type HandshakeClientComplex added in v0.1.0

type HandshakeClientComplex struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientComplex) ReadS0S1 added in v0.22.0

func (c *HandshakeClientComplex) ReadS0S1(reader io.Reader) error

func (*HandshakeClientComplex) ReadS2 added in v0.22.0

func (c *HandshakeClientComplex) ReadS2(reader io.Reader) error

func (*HandshakeClientComplex) WriteC0C1 added in v0.1.0

func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error

func (*HandshakeClientComplex) WriteC2 added in v0.1.0

func (c *HandshakeClientComplex) WriteC2(writer io.Writer) error

type HandshakeClientSimple added in v0.1.0

type HandshakeClientSimple struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientSimple) ReadS0S1 added in v0.22.0

func (c *HandshakeClientSimple) ReadS0S1(reader io.Reader) error

func (*HandshakeClientSimple) ReadS2 added in v0.22.0

func (c *HandshakeClientSimple) ReadS2(reader io.Reader) error

func (*HandshakeClientSimple) WriteC0C1 added in v0.1.0

func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error

func (*HandshakeClientSimple) WriteC2 added in v0.1.0

func (c *HandshakeClientSimple) WriteC2(writer io.Writer) error

type HandshakeServer

type HandshakeServer struct {
	// contains filtered or unexported fields
}

func (*HandshakeServer) ReadC0C1

func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error)

func (*HandshakeServer) ReadC2

func (s *HandshakeServer) ReadC2(reader io.Reader) error

func (*HandshakeServer) WriteS0S1S2

func (s *HandshakeServer) WriteS0S1S2(writer io.Writer) error

type IHandshakeClient added in v0.22.0

type IHandshakeClient interface {
	WriteC0C1(writer io.Writer) error
	ReadS0S1(reader io.Reader) error
	WriteC2(writer io.Writer) error
	ReadS2(reader io.Reader) error
}

type MessagePacker

type MessagePacker struct {
	// contains filtered or unexported fields
}

打包并发送 rtmp 信令

func NewMessagePacker

func NewMessagePacker() *MessagePacker

func (*MessagePacker) ChunkAndWrite added in v0.24.0

func (packer *MessagePacker) ChunkAndWrite(writer io.Writer, csid int, typeid uint8, streamid int) error

type ModClientSessionOption added in v0.5.0

type ModClientSessionOption func(option *ClientSessionOption)

type ModPullSessionOption added in v0.5.0

type ModPullSessionOption func(option *PullSessionOption)

type ModPushSessionOption added in v0.5.0

type ModPushSessionOption func(option *PushSessionOption)

type ObjectPair

type ObjectPair struct {
	Key   string
	Value interface{}
}

type ObjectPairArray added in v0.12.0

type ObjectPairArray []ObjectPair

func ParseMetadata added in v0.12.0

func ParseMetadata(b []byte) (ObjectPairArray, error)

func (ObjectPairArray) Find added in v0.12.0

func (o ObjectPairArray) Find(key string) interface{}

func (ObjectPairArray) FindNumber added in v0.16.0

func (o ObjectPairArray) FindNumber(key string) (int, error)

func (ObjectPairArray) FindString added in v0.12.0

func (o ObjectPairArray) FindString(key string) (string, error)

type OnCompleteMessage added in v0.13.0

type OnCompleteMessage func(stream *Stream) error

type OnReadRtmpAvMsg added in v0.23.0

type OnReadRtmpAvMsg func(msg base.RtmpMsg)

type PubSessionObserver

type PubSessionObserver interface {
	// 注意,回调结束后,内部会复用Payload内存块
	OnReadRtmpAvMsg(msg base.RtmpMsg)
}

type PullSession

type PullSession struct {
	// contains filtered or unexported fields
}

func NewPullSession

func NewPullSession(modOptions ...ModPullSessionOption) *PullSession

func (*PullSession) AppName added in v0.18.0

func (s *PullSession) AppName() string

文档请参考: interface ISessionUrlContext

func (*PullSession) Dispose added in v0.5.0

func (s *PullSession) Dispose() error

文档请参考: interface IClientSessionLifecycle

func (*PullSession) GetStat added in v0.17.0

func (s *PullSession) GetStat() base.StatSession

文档请参考: interface ISessionStat

func (*PullSession) IsAlive added in v0.17.0

func (s *PullSession) IsAlive() (readAlive, writeAlive bool)

文档请参考: interface ISessionStat

func (*PullSession) Pull

func (s *PullSession) Pull(rawUrl string, onReadRtmpAvMsg OnReadRtmpAvMsg) error

阻塞直到和对端完成拉流前的所有准备工作(也即收到RTMP Play response),或者发生错误

@param onReadRtmpAvMsg: msg: 注意,回调结束后,`msg`的内存块会被`PullSession`重复使用

也即多次回调的`msg`是复用的同一块内存块
如果业务方需要在回调结束后,依然持有`msg`,那么需要对`msg`进行拷贝,比如调用`msg.Clone()`
只在回调中使用`msg`,则不需要拷贝

func (*PullSession) RawQuery added in v0.18.0

func (s *PullSession) RawQuery() string

文档请参考: interface ISessionUrlContext

func (*PullSession) StreamName added in v0.18.0

func (s *PullSession) StreamName() string

文档请参考: interface ISessionUrlContext

func (*PullSession) UniqueKey added in v0.12.0

func (s *PullSession) UniqueKey() string

文档请参考: interface IObject

func (*PullSession) UpdateStat added in v0.17.0

func (s *PullSession) UpdateStat(intervalSec uint32)

文档请参考: interface ISessionStat

func (*PullSession) Url added in v0.23.0

func (s *PullSession) Url() string

文档请参考: interface ISessionUrlContext

func (*PullSession) WaitChan added in v0.20.0

func (s *PullSession) WaitChan() <-chan error

文档请参考: interface IClientSessionLifecycle

type PullSessionOption added in v0.5.0

type PullSessionOption struct {
	// 从调用Pull函数,到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PullTimeoutMs int

	ReadAvTimeoutMs      int
	HandshakeComplexFlag bool
}

type PushSession

type PushSession struct {
	IsFresh bool
	// contains filtered or unexported fields
}

func NewPushSession

func NewPushSession(modOptions ...ModPushSessionOption) *PushSession

func (*PushSession) AppName added in v0.18.0

func (s *PushSession) AppName() string

文档请参考: interface ISessionUrlContext

func (*PushSession) Dispose added in v0.5.0

func (s *PushSession) Dispose() error

文档请参考: interface IClientSessionLifecycle

func (*PushSession) Flush added in v0.5.0

func (s *PushSession) Flush() error

将缓存的数据立即刷新发送 是否有缓存策略,请参见配置及内部实现

func (*PushSession) GetStat added in v0.18.0

func (s *PushSession) GetStat() base.StatSession

文档请参考: interface ISessionStat

func (*PushSession) IsAlive added in v0.18.0

func (s *PushSession) IsAlive() (readAlive, writeAlive bool)

文档请参考: interface ISessionStat

func (*PushSession) Push

func (s *PushSession) Push(rawUrl string) error

阻塞直到和对端完成推流前,握手部分的工作(也即收到RTMP Publish response),或者发生错误

func (*PushSession) RawQuery added in v0.18.0

func (s *PushSession) RawQuery() string

文档请参考: interface ISessionUrlContext

func (*PushSession) StreamName added in v0.18.0

func (s *PushSession) StreamName() string

文档请参考: interface ISessionUrlContext

func (*PushSession) UniqueKey added in v0.11.0

func (s *PushSession) UniqueKey() string

文档请参考: interface IObject

func (*PushSession) UpdateStat added in v0.18.0

func (s *PushSession) UpdateStat(intervalSec uint32)

文档请参考: interface ISessionStat

func (*PushSession) Url added in v0.23.0

func (s *PushSession) Url() string

文档请参考: interface ISessionUrlContext

func (*PushSession) WaitChan added in v0.20.0

func (s *PushSession) WaitChan() <-chan error

文档请参考: interface IClientSessionLifecycle

func (*PushSession) Write added in v0.20.0

func (s *PushSession) Write(msg []byte) error

发送数据 注意,业务方需将数据打包成rtmp chunk格式后,再调用该函数发送

type PushSessionOption added in v0.5.0

type PushSessionOption struct {
	// 从调用Push函数,到可以发送音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PushTimeoutMs int

	WriteAvTimeoutMs     int
	HandshakeComplexFlag bool
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(observer ServerObserver, addr string) *Server

func (*Server) Dispose

func (server *Server) Dispose()

func (*Server) Listen added in v0.10.0

func (server *Server) Listen() (err error)

func (*Server) OnNewRtmpPubSession added in v0.23.0

func (server *Server) OnNewRtmpPubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnNewRtmpSubSession added in v0.23.0

func (server *Server) OnNewRtmpSubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnRtmpConnect added in v0.23.0

func (server *Server) OnRtmpConnect(session *ServerSession, opa ObjectPairArray)

ServerSessionObserver

func (*Server) RunLoop

func (server *Server) RunLoop() error

type ServerObserver

type ServerObserver interface {
	OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
	OnNewRtmpPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接
	OnDelRtmpPubSession(session *ServerSession)
	OnNewRtmpSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接
	OnDelRtmpSubSession(session *ServerSession)
}

type ServerSession

type ServerSession struct {

	// only for SubSession
	IsFresh                 bool
	ShouldWaitVideoKeyFrame bool
	// contains filtered or unexported fields
}

func NewServerSession

func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession

func (*ServerSession) AppName

func (s *ServerSession) AppName() string

func (*ServerSession) Dispose

func (s *ServerSession) Dispose() error

func (*ServerSession) Flush added in v0.3.0

func (s *ServerSession) Flush() error

func (*ServerSession) GetStat added in v0.16.0

func (s *ServerSession) GetStat() base.StatSession

func (*ServerSession) IsAlive added in v0.17.0

func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)

func (*ServerSession) RawQuery added in v0.17.0

func (s *ServerSession) RawQuery() string

func (*ServerSession) RunLoop

func (s *ServerSession) RunLoop() (err error)

func (*ServerSession) SetPubSessionObserver

func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)

func (*ServerSession) StreamName

func (s *ServerSession) StreamName() string

func (*ServerSession) UniqueKey

func (s *ServerSession) UniqueKey() string

func (*ServerSession) UpdateStat added in v0.16.0

func (s *ServerSession) UpdateStat(intervalSec uint32)

func (*ServerSession) Url added in v0.23.0

func (s *ServerSession) Url() string

func (*ServerSession) Write added in v0.20.0

func (s *ServerSession) Write(msg []byte) error

type ServerSessionObserver

type ServerSessionObserver interface {
	OnRtmpConnect(session *ServerSession, opa ObjectPairArray)
	OnNewRtmpPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
	OnNewRtmpSubSession(session *ServerSession)
}

type ServerSessionType

type ServerSessionType int
const (
	ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态
	ServerSessionTypePub
	ServerSessionTypeSub
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream() *Stream

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

TODO chef: 将这个buffer实现和bytes.Buffer做比较,考虑将它放入naza package中

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL