rtmp

package
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 23, 2020 License: MIT Imports: 17 Imported by: 7

Documentation

Index

Constants

View Source
const (
	AMF0TypeMarkerNumber     = uint8(0x00)
	AMF0TypeMarkerBoolean    = uint8(0x01)
	AMF0TypeMarkerString     = uint8(0x02)
	AMF0TypeMarkerObject     = uint8(0x03)
	AMF0TypeMarkerNull       = uint8(0x05)
	AMF0TypeMarkerEcmaArray  = uint8(0x08)
	AMF0TypeMarkerObjectEnd  = uint8(0x09)
	AMF0TypeMarkerLongString = uint8(0x0c)
)
View Source
const (
	CSIDAMF   = 5
	CSIDAudio = 6
	CSIDVideo = 7
)
View Source
const (
	//MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令
	MSID1 = 1 // publish、play、onStatus 以及 音视频数据
)

Variables

View Source
var (
	ErrAMFInvalidType = errors.New("lal.rtmp: invalid amf0 type")
	ErrAMFTooShort    = errors.New("lal.rtmp: too short to unmarshal amf0 data")
	ErrAMFNotExist    = errors.New("lal.rtmp: not exist")
)
View Source
var AMF0 amf0
View Source
var AMF0TypeMarkerObjectEndBytes = []byte{0, 0, AMF0TypeMarkerObjectEnd}
View Source
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
View Source
var ErrRTMP = errors.New("lal.rtmp: fxxk")
View Source
var (
	LocalChunkSize = 4096 // 本端设置的 chunk size

)

一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口

Functions

func BuildMetadata added in v0.14.0

func BuildMetadata(width int, height int, audiocodecid int, videocodecid int) ([]byte, error)

spec-video_file_format_spec_v10.pdf onMetaData - duration DOUBLE, seconds - width DOUBLE - height DOUBLE - videodatarate DOUBLE - framerate DOUBLE - videocodecid DOUBLE - audiosamplerate DOUBLE - audiosamplesize DOUBLE - stereo BOOL - audiocodecid DOUBLE - filesize DOUBLE, bytes

- encoder - server - author - version

@param audiocodecid AAC 10 @param videocodecid AVC 7

func Message2Chunks

func Message2Chunks(message []byte, header *base.RTMPHeader) []byte

@param header 注意,内部使用TimestampAbs而非Timestamp @return 返回的内存块由内部申请,不依赖参数<message>内存块

Types

type ChunkComposer

type ChunkComposer struct {
	// contains filtered or unexported fields
}

func NewChunkComposer

func NewChunkComposer() *ChunkComposer

func (*ChunkComposer) RunLoop

func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error

@param cb 回调结束后,内存块会被 ChunkComposer 再次使用

func (*ChunkComposer) SetPeerChunkSize

func (c *ChunkComposer) SetPeerChunkSize(val uint32)

type ChunkDivider added in v0.3.0

type ChunkDivider struct {
	// contains filtered or unexported fields
}

func (*ChunkDivider) Message2Chunks added in v0.3.0

func (d *ChunkDivider) Message2Chunks(message []byte, header *base.RTMPHeader) []byte

TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message

type ClientSession

type ClientSession struct {
	UniqueKey string
	// contains filtered or unexported fields
}

rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession

func NewClientSession

func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession

@param t: session的类型,只能是推或者拉

func (*ClientSession) AsyncWrite added in v0.3.0

func (s *ClientSession) AsyncWrite(msg []byte) error

func (*ClientSession) Dispose added in v0.3.0

func (s *ClientSession) Dispose()

func (*ClientSession) Done added in v0.11.0

func (s *ClientSession) Done() <-chan error

func (*ClientSession) Flush added in v0.3.0

func (s *ClientSession) Flush() error

type ClientSessionOption added in v0.5.0

type ClientSessionOption struct {
	// 单位毫秒,如果为0,则没有超时
	ConnectTimeoutMS int // 建立连接超时
	DoTimeoutMS      int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
	ReadAVTimeoutMS  int // 读取音视频数据的超时
	WriteAVTimeoutMS int // 发送音视频数据的超时
}

type ClientSessionType

type ClientSessionType int
const (
	CSTPullSession ClientSessionType = iota
	CSTPushSession
)

type HandshakeClient

type HandshakeClient interface {
	WriteC0C1(writer io.Writer) error
	ReadS0S1S2(reader io.Reader) error
	WriteC2(writer io.Writer) error
}

type HandshakeClientComplex added in v0.1.0

type HandshakeClientComplex struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientComplex) ReadS0S1S2 added in v0.1.0

func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error

func (*HandshakeClientComplex) WriteC0C1 added in v0.1.0

func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error

func (*HandshakeClientComplex) WriteC2 added in v0.1.0

func (c *HandshakeClientComplex) WriteC2(write io.Writer) error

type HandshakeClientSimple added in v0.1.0

type HandshakeClientSimple struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientSimple) ReadS0S1S2 added in v0.1.0

func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error

func (*HandshakeClientSimple) WriteC0C1 added in v0.1.0

func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error

func (*HandshakeClientSimple) WriteC2 added in v0.1.0

func (c *HandshakeClientSimple) WriteC2(write io.Writer) error

type HandshakeServer

type HandshakeServer struct {
	// contains filtered or unexported fields
}

func (*HandshakeServer) ReadC0C1

func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error)

func (*HandshakeServer) ReadC2

func (s *HandshakeServer) ReadC2(reader io.Reader) error

func (*HandshakeServer) WriteS0S1S2

func (s *HandshakeServer) WriteS0S1S2(write io.Writer) error

type MessagePacker

type MessagePacker struct {
	// contains filtered or unexported fields
}

func NewMessagePacker

func NewMessagePacker() *MessagePacker

type ModClientSessionOption added in v0.5.0

type ModClientSessionOption func(option *ClientSessionOption)

type ModPullSessionOption added in v0.5.0

type ModPullSessionOption func(option *PullSessionOption)

type ModPushSessionOption added in v0.5.0

type ModPushSessionOption func(option *PushSessionOption)

type ObjectPair

type ObjectPair struct {
	Key   string
	Value interface{}
}

type ObjectPairArray added in v0.12.0

type ObjectPairArray []ObjectPair

func ParseMetadata added in v0.12.0

func ParseMetadata(b []byte) (ObjectPairArray, error)

TODO chef: 见ServerSession::doDataMessageAMF0

func (ObjectPairArray) Find added in v0.12.0

func (o ObjectPairArray) Find(key string) interface{}

func (ObjectPairArray) FindString added in v0.12.0

func (o ObjectPairArray) FindString(key string) (string, error)

type OnCompleteMessage added in v0.13.0

type OnCompleteMessage func(stream *Stream) error

type OnReadRTMPAVMsg added in v0.5.0

type OnReadRTMPAVMsg func(msg base.RTMPMsg)

type PubSessionObserver

type PubSessionObserver interface {
	// 注意,回调结束后,内部会复用Payload内存块
	OnReadRTMPAVMsg(msg base.RTMPMsg)
}

type PullSession

type PullSession struct {
	// contains filtered or unexported fields
}

func NewPullSession

func NewPullSession(modOptions ...ModPullSessionOption) *PullSession

func (*PullSession) Dispose added in v0.5.0

func (s *PullSession) Dispose()

func (*PullSession) Done added in v0.12.0

func (s *PullSession) Done() <-chan error

func (*PullSession) Pull

func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error

建立rtmp play连接 阻塞直到收到服务端返回的rtmp publish对应结果的信令,或发生错误

@param onReadRTMPAVMsg: 注意,回调结束后,内存块会被PullSession重复使用

func (*PullSession) UniqueKey added in v0.12.0

func (s *PullSession) UniqueKey() string

type PullSessionOption added in v0.5.0

type PullSessionOption struct {
	ConnectTimeoutMS int
	PullTimeoutMS    int
	ReadAVTimeoutMS  int
}

type PushSession

type PushSession struct {
	IsFresh bool
	// contains filtered or unexported fields
}

func NewPushSession

func NewPushSession(modOptions ...ModPushSessionOption) *PushSession

func (*PushSession) AsyncWrite added in v0.5.0

func (s *PushSession) AsyncWrite(msg []byte) error

func (*PushSession) Dispose added in v0.5.0

func (s *PushSession) Dispose()

func (*PushSession) Done added in v0.11.0

func (s *PushSession) Done() <-chan error

func (*PushSession) Flush added in v0.5.0

func (s *PushSession) Flush() error

func (*PushSession) Push

func (s *PushSession) Push(rawURL string) error

建立rtmp publish连接 阻塞直到收到服务端返回的rtmp publish对应结果的信令,或发生错误

func (*PushSession) UniqueKey added in v0.11.0

func (s *PushSession) UniqueKey() string

type PushSessionOption added in v0.5.0

type PushSessionOption struct {
	ConnectTimeoutMS int
	PushTimeoutMS    int
	WriteAVTimeoutMS int
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(obs ServerObserver, addr string) *Server

func (*Server) Dispose

func (server *Server) Dispose()

func (*Server) Listen added in v0.10.0

func (server *Server) Listen() (err error)

func (*Server) OnNewRTMPPubSession added in v0.13.0

func (server *Server) OnNewRTMPPubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnNewRTMPSubSession added in v0.13.0

func (server *Server) OnNewRTMPSubSession(session *ServerSession)

ServerSessionObserver

func (*Server) RunLoop

func (server *Server) RunLoop() error

type ServerObserver

type ServerObserver interface {
	OnNewRTMPPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接
	OnDelRTMPPubSession(session *ServerSession)
	OnNewRTMPSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接
	OnDelRTMPSubSession(session *ServerSession)
}

type ServerSession

type ServerSession struct {
	UniqueKey              string
	AppName                string
	StreamName             string
	StreamNameWithRawQuery string

	// only for SubSession
	IsFresh bool
	// contains filtered or unexported fields
}

func NewServerSession

func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession

func (*ServerSession) AsyncWrite

func (s *ServerSession) AsyncWrite(msg []byte) error

func (*ServerSession) Dispose

func (s *ServerSession) Dispose()

func (*ServerSession) Flush added in v0.3.0

func (s *ServerSession) Flush() error

func (*ServerSession) ModConnProps added in v0.3.0

func (s *ServerSession) ModConnProps()

func (*ServerSession) RunLoop

func (s *ServerSession) RunLoop() (err error)

func (*ServerSession) SetPubSessionObserver

func (s *ServerSession) SetPubSessionObserver(obs PubSessionObserver)

type ServerSessionObserver

type ServerSessionObserver interface {
	OnNewRTMPPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
	OnNewRTMPSubSession(session *ServerSession)
}

type ServerSessionType

type ServerSessionType int
const (
	ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态
	ServerSessionTypePub
	ServerSessionTypeSub
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream() *Stream

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

TODO chef: 将这个buffer实现和bytes.Buffer做比较,考虑将它放入naza package中

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL