rtmp

package
v0.19.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2021 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AMF0TypeMarkerNumber     = uint8(0x00)
	AMF0TypeMarkerBoolean    = uint8(0x01)
	AMF0TypeMarkerString     = uint8(0x02)
	AMF0TypeMarkerObject     = uint8(0x03)
	AMF0TypeMarkerNull       = uint8(0x05)
	AMF0TypeMarkerEcmaArray  = uint8(0x08)
	AMF0TypeMarkerObjectEnd  = uint8(0x09)
	AMF0TypeMarkerLongString = uint8(0x0c)
)
View Source
const (
	CSIDAMF   = 5
	CSIDAudio = 6
	CSIDVideo = 7
)
View Source
const (
	//MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令
	MSID1 = 1 // publish、play、onStatus 以及 音视频数据
)

Variables

View Source
var (
	ErrAMFInvalidType = errors.New("lal.rtmp: invalid amf0 type")
	ErrAMFTooShort    = errors.New("lal.rtmp: too short to unmarshal amf0 data")
	ErrAMFNotExist    = errors.New("lal.rtmp: not exist")
)
View Source
var AMF0 amf0
View Source
var AMF0TypeMarkerObjectEndBytes = []byte{0, 0, AMF0TypeMarkerObjectEnd}
View Source
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
View Source
var ErrRTMP = errors.New("lal.rtmp: fxxk")
View Source
var (
	LocalChunkSize = 4096 // 本端设置的 chunk size

)

TODO chef 一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口

Functions

func BuildMetadata

func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)

spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes

目前包含的字段: - width - height - audiocodecid - videocodecid - version

width 如果为-1,则metadata中不写入该字段 height 如果为-1,则metadata中不写入该字段 audiocodecid 如果为-1,则metadata中不写入该字段

AAC 10

videocodecid 如果为-1,则metadata中不写入该字段

H264 7
H265 12

@return 返回的内存块为新申请的独立内存块

func Message2Chunks

func Message2Chunks(message []byte, header *base.RTMPHeader) []byte

@return 返回的内存块由内部申请,不依赖参数<message>内存块

Types

type ChunkComposer

type ChunkComposer struct {
	// contains filtered or unexported fields
}

func NewChunkComposer

func NewChunkComposer() *ChunkComposer

func (*ChunkComposer) RunLoop

func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error

cb 回调结束后,内存块会被 ChunkComposer 再次使用

func (*ChunkComposer) SetPeerChunkSize

func (c *ChunkComposer) SetPeerChunkSize(val uint32)

type ChunkDivider

type ChunkDivider struct {
	// contains filtered or unexported fields
}

func (*ChunkDivider) Message2Chunks

func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RTMPHeader) []byte

TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message

type ClientSession

type ClientSession struct {
	UniqueKey string
	// contains filtered or unexported fields
}

rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession

func NewClientSession

func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession

t: session的类型,只能是推或者拉

func (*ClientSession) AppName

func (s *ClientSession) AppName() string

func (*ClientSession) AsyncWrite

func (s *ClientSession) AsyncWrite(msg []byte) error

func (*ClientSession) Dispose

func (s *ClientSession) Dispose()

func (*ClientSession) Do

func (s *ClientSession) Do(rawURL string) error

阻塞直到收到服务端返回的 publish / play 对应结果的信令或者发生错误

func (*ClientSession) Flush

func (s *ClientSession) Flush() error

func (*ClientSession) GetStat

func (s *ClientSession) GetStat() base.StatSession

func (*ClientSession) IsAlive

func (s *ClientSession) IsAlive() (readAlive, writeAlive bool)

func (*ClientSession) RawQuery

func (s *ClientSession) RawQuery() string

func (*ClientSession) StreamName

func (s *ClientSession) StreamName() string

func (*ClientSession) UpdateStat

func (s *ClientSession) UpdateStat(interval uint32)

func (*ClientSession) Wait

func (s *ClientSession) Wait() <-chan error

Do成功后,调用该函数,可阻塞直到推流或拉流结束

type ClientSessionOption

type ClientSessionOption struct {
	// 单位毫秒,如果为0,则没有超时
	DoTimeoutMS      int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
	ReadAVTimeoutMS  int // 读取音视频数据的超时
	WriteAVTimeoutMS int // 发送音视频数据的超时
}

type ClientSessionType

type ClientSessionType int
const (
	CSTPullSession ClientSessionType = iota
	CSTPushSession
)

type HandshakeClient

type HandshakeClient interface {
	WriteC0C1(writer io.Writer) error
	ReadS0S1S2(reader io.Reader) error
	WriteC2(writer io.Writer) error
}

type HandshakeClientComplex

type HandshakeClientComplex struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientComplex) ReadS0S1S2

func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error

func (*HandshakeClientComplex) WriteC0C1

func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error

func (*HandshakeClientComplex) WriteC2

func (c *HandshakeClientComplex) WriteC2(write io.Writer) error

type HandshakeClientSimple

type HandshakeClientSimple struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientSimple) ReadS0S1S2

func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error

func (*HandshakeClientSimple) WriteC0C1

func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error

func (*HandshakeClientSimple) WriteC2

func (c *HandshakeClientSimple) WriteC2(write io.Writer) error

type HandshakeServer

type HandshakeServer struct {
	// contains filtered or unexported fields
}

func (*HandshakeServer) ReadC0C1

func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error)

func (*HandshakeServer) ReadC2

func (s *HandshakeServer) ReadC2(reader io.Reader) error

func (*HandshakeServer) WriteS0S1S2

func (s *HandshakeServer) WriteS0S1S2(write io.Writer) error

type MessagePacker

type MessagePacker struct {
	// contains filtered or unexported fields
}

func NewMessagePacker

func NewMessagePacker() *MessagePacker

type ModClientSessionOption

type ModClientSessionOption func(option *ClientSessionOption)

type ModPullSessionOption

type ModPullSessionOption func(option *PullSessionOption)

type ModPushSessionOption

type ModPushSessionOption func(option *PushSessionOption)

type ObjectPair

type ObjectPair struct {
	Key   string
	Value interface{}
}

type ObjectPairArray

type ObjectPairArray []ObjectPair

func ParseMetadata

func ParseMetadata(b []byte) (ObjectPairArray, error)

func (ObjectPairArray) Find

func (o ObjectPairArray) Find(key string) interface{}

func (ObjectPairArray) FindNumber

func (o ObjectPairArray) FindNumber(key string) (int, error)

func (ObjectPairArray) FindString

func (o ObjectPairArray) FindString(key string) (string, error)

type OnCompleteMessage

type OnCompleteMessage func(stream *Stream) error

type OnReadRTMPAVMsg

type OnReadRTMPAVMsg func(msg base.RTMPMsg)

type PubSessionObserver

type PubSessionObserver interface {
	// 注意,回调结束后,内部会复用Payload内存块
	OnReadRTMPAVMsg(msg base.RTMPMsg)
}

type PullSession

type PullSession struct {
	// contains filtered or unexported fields
}

func NewPullSession

func NewPullSession(modOptions ...ModPullSessionOption) *PullSession

func (*PullSession) AppName

func (s *PullSession) AppName() string

func (*PullSession) Dispose

func (s *PullSession) Dispose()

func (*PullSession) GetStat

func (s *PullSession) GetStat() base.StatSession

func (*PullSession) IsAlive

func (s *PullSession) IsAlive() (readAlive, writeAlive bool)

func (*PullSession) Pull

func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error

如果没有发生错误,阻塞直到到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令

onReadRTMPAVMsg: 注意,回调结束后,内存块会被PullSession重复使用

func (*PullSession) RawQuery

func (s *PullSession) RawQuery() string

func (*PullSession) StreamName

func (s *PullSession) StreamName() string

func (*PullSession) UniqueKey

func (s *PullSession) UniqueKey() string

func (*PullSession) UpdateStat

func (s *PullSession) UpdateStat(interval uint32)

func (*PullSession) Wait

func (s *PullSession) Wait() <-chan error

Pull成功后,调用该函数,可阻塞直到拉流结束

type PullSessionOption

type PullSessionOption struct {
	// 从调用Pull函数,到接收音视频数据的前一步,也即收到服务端返回的rtmp play对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PullTimeoutMS int

	ReadAVTimeoutMS int
}

type PushSession

type PushSession struct {
	IsFresh bool
	// contains filtered or unexported fields
}

func NewPushSession

func NewPushSession(modOptions ...ModPushSessionOption) *PushSession

func (*PushSession) AppName

func (s *PushSession) AppName() string

func (*PushSession) AsyncWrite

func (s *PushSession) AsyncWrite(msg []byte) error

func (*PushSession) Dispose

func (s *PushSession) Dispose()

func (*PushSession) Flush

func (s *PushSession) Flush() error

func (*PushSession) GetStat

func (s *PushSession) GetStat() base.StatSession

func (*PushSession) IsAlive

func (s *PushSession) IsAlive() (readAlive, writeAlive bool)

func (*PushSession) Push

func (s *PushSession) Push(rawURL string) error

如果没有错误发生,阻塞到接收音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令

func (*PushSession) RawQuery

func (s *PushSession) RawQuery() string

func (*PushSession) StreamName

func (s *PushSession) StreamName() string

func (*PushSession) UniqueKey

func (s *PushSession) UniqueKey() string

func (*PushSession) UpdateStat

func (s *PushSession) UpdateStat(interval uint32)

func (*PushSession) Wait

func (s *PushSession) Wait() <-chan error

type PushSessionOption

type PushSessionOption struct {
	// 从调用Push函数,到可以发送音视频数据的前一步,也即收到服务端返回的rtmp publish对应结果的信令的超时时间
	// 如果为0,则没有超时时间
	PushTimeoutMS int

	WriteAVTimeoutMS int
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(observer ServerObserver, addr string) *Server

func (*Server) Dispose

func (server *Server) Dispose()

func (*Server) Listen

func (server *Server) Listen() (err error)

func (*Server) OnNewRTMPPubSession

func (server *Server) OnNewRTMPPubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnNewRTMPSubSession

func (server *Server) OnNewRTMPSubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnRTMPConnect

func (server *Server) OnRTMPConnect(session *ServerSession, opa ObjectPairArray)

ServerSessionObserver

func (*Server) RunLoop

func (server *Server) RunLoop() error

type ServerObserver

type ServerObserver interface {
	OnRTMPConnect(session *ServerSession, opa ObjectPairArray)
	OnNewRTMPPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接
	OnDelRTMPPubSession(session *ServerSession)
	OnNewRTMPSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接
	OnDelRTMPSubSession(session *ServerSession)
}

type ServerSession

type ServerSession struct {
	UniqueKey string // const after ctor

	// only for SubSession
	IsFresh bool
	// contains filtered or unexported fields
}

func NewServerSession

func NewServerSession(observer ServerSessionObserver, conn net.Conn) *ServerSession

func (*ServerSession) AppName

func (s *ServerSession) AppName() string

func (*ServerSession) AsyncWrite

func (s *ServerSession) AsyncWrite(msg []byte) error

func (*ServerSession) Dispose

func (s *ServerSession) Dispose()

func (*ServerSession) Flush

func (s *ServerSession) Flush() error

func (*ServerSession) GetStat

func (s *ServerSession) GetStat() base.StatSession

func (*ServerSession) IsAlive

func (s *ServerSession) IsAlive() (readAlive, writeAlive bool)

func (*ServerSession) RawQuery

func (s *ServerSession) RawQuery() string

func (*ServerSession) RemoteAddr

func (s *ServerSession) RemoteAddr() string

func (*ServerSession) RunLoop

func (s *ServerSession) RunLoop() (err error)

func (*ServerSession) SetPubSessionObserver

func (s *ServerSession) SetPubSessionObserver(observer PubSessionObserver)

func (*ServerSession) StreamName

func (s *ServerSession) StreamName() string

func (*ServerSession) URL

func (s *ServerSession) URL() string

func (*ServerSession) UpdateStat

func (s *ServerSession) UpdateStat(interval uint32)

type ServerSessionObserver

type ServerSessionObserver interface {
	OnRTMPConnect(session *ServerSession, opa ObjectPairArray)
	OnNewRTMPPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
	OnNewRTMPSubSession(session *ServerSession)
}

type ServerSessionType

type ServerSessionType int
const (
	ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态
	ServerSessionTypePub
	ServerSessionTypeSub
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream() *Stream

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

TODO chef: 将这个buffer实现和bytes.Buffer做比较,考虑将它放入naza package中

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL