rtmp

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2020 License: MIT Imports: 16 Imported by: 7

Documentation

Index

Constants

View Source
const (
	AMF0TypeMarkerNumber     = uint8(0x00)
	AMF0TypeMarkerBoolean    = uint8(0x01)
	AMF0TypeMarkerString     = uint8(0x02)
	AMF0TypeMarkerObject     = uint8(0x03)
	AMF0TypeMarkerNull       = uint8(0x05)
	AMF0TypeMarkerEcmaArray  = uint8(0x08)
	AMF0TypeMarkerObjectEnd  = uint8(0x09)
	AMF0TypeMarkerLongString = uint8(0x0c)
)
View Source
const (
	CSIDAMF   = 5
	CSIDAudio = 6
	CSIDVideo = 7
)
View Source
const (
	TypeidAudio           = uint8(8)
	TypeidVideo           = uint8(9)
	TypeidDataMessageAMF0 = uint8(18) // meta

)
View Source
const (
	SoundFormatAAC uint8 = 10

	AVCKeyFrame   = frameTypeKey<<4 | codecIDAVC
	AVCInterFrame = frameTypeInter<<4 | codecIDAVC

	HEVCKeyFrame   = frameTypeKey<<4 | codecIDHEVC
	HEVCInterFrame = frameTypeInter<<4 | codecIDHEVC

	AVCPacketTypeSeqHeader uint8 = 0
	AVCPacketTypeNALU      uint8 = 1

	HEVCPacketTypeSeqHeader uint8 = 0
	HEVCPacketTypeNALU      uint8 = 1

	AACPacketTypeSeqHeader uint8 = 0
	AACPacketTypeRaw       uint8 = 1
)

这部分内容,和httpflv中的类似

View Source
const (
	//MSID0 = 0 // 所有除 publish、play、onStatus 之外的信令
	MSID1 = 1 // publish、play、onStatus 以及 音视频数据
)

Variables

View Source
var (
	ErrAMFInvalidType = errors.New("lal.rtmp: invalid amf0 type")
	ErrAMFTooShort    = errors.New("lal.rtmp: too short to unmarshal amf0 data")
	ErrAMFNotExist    = errors.New("lal.rtmp: not exist")
)
View Source
var AMF0 amf0
View Source
var AMF0TypeMarkerObjectEndBytes = []byte{0, 0, AMF0TypeMarkerObjectEnd}
View Source
var ErrClientSessionTimeout = errors.New("lal.rtmp: client session timeout")
View Source
var ErrRTMP = errors.New("lal.rtmp: fxxk")
View Source
var (
	LocalChunkSize = 4096 // 本端设置的 chunk size

)

一些更专业的配置项,暂时只在该源码文件中配置,不提供外部配置接口

Functions

func Message2Chunks

func Message2Chunks(message []byte, header *Header) []byte

@param header 注意,内部使用 TimestampAbs 而非 Timestamp

Types

type AVMsg added in v0.5.0

type AVMsg struct {
	Header  Header
	Payload []byte // 不包含 rtmp 头
}

func (AVMsg) IsAACSeqHeader added in v0.7.0

func (msg AVMsg) IsAACSeqHeader() bool

func (AVMsg) IsAVCKeyNALU added in v0.13.0

func (msg AVMsg) IsAVCKeyNALU() bool

func (AVMsg) IsAVCKeySeqHeader added in v0.7.0

func (msg AVMsg) IsAVCKeySeqHeader() bool

func (AVMsg) IsHEVCKeyNALU added in v0.13.0

func (msg AVMsg) IsHEVCKeyNALU() bool

func (AVMsg) IsHEVCKeySeqHeader added in v0.8.0

func (msg AVMsg) IsHEVCKeySeqHeader() bool

func (AVMsg) IsVideoKeyNALU added in v0.13.0

func (msg AVMsg) IsVideoKeyNALU() bool

func (AVMsg) IsVideoKeySeqHeader added in v0.8.0

func (msg AVMsg) IsVideoKeySeqHeader() bool

type ChunkComposer

type ChunkComposer struct {
	// contains filtered or unexported fields
}

func NewChunkComposer

func NewChunkComposer() *ChunkComposer

func (*ChunkComposer) RunLoop

func (c *ChunkComposer) RunLoop(reader io.Reader, cb OnCompleteMessage) error

@param cb 回调结束后,内存块会被 ChunkComposer 再次使用

func (*ChunkComposer) SetPeerChunkSize

func (c *ChunkComposer) SetPeerChunkSize(val uint32)

type ChunkDivider added in v0.3.0

type ChunkDivider struct {
	// contains filtered or unexported fields
}

func (*ChunkDivider) Message2Chunks added in v0.3.0

func (d *ChunkDivider) Message2Chunks(message []byte, header *Header) []byte

TODO chef: 新的 message 的第一个 chunk 始终使用 fmt0 格式,没有参考前一个 message

type ClientSession

type ClientSession struct {
	UniqueKey string
	// contains filtered or unexported fields
}

rtmp 客户端类型连接的底层实现 package rtmp 的使用者应该优先使用基于 ClientSession 实现的 PushSession 和 PullSession

func NewClientSession

func NewClientSession(t ClientSessionType, modOptions ...ModClientSessionOption) *ClientSession

@param t: session的类型,只能是推或者拉

func (*ClientSession) AsyncWrite added in v0.3.0

func (s *ClientSession) AsyncWrite(msg []byte) error

func (*ClientSession) Dispose added in v0.3.0

func (s *ClientSession) Dispose()

func (*ClientSession) Done added in v0.11.0

func (s *ClientSession) Done() <-chan error

func (*ClientSession) Flush added in v0.3.0

func (s *ClientSession) Flush() error

type ClientSessionOption added in v0.5.0

type ClientSessionOption struct {
	// 单位毫秒,如果为0,则没有超时
	ConnectTimeoutMS int // 建立连接超时
	DoTimeoutMS      int // 从发起连接(包含了建立连接的时间)到收到publish或play信令结果的超时
	ReadAVTimeoutMS  int // 读取音视频数据的超时
	WriteAVTimeoutMS int // 发送音视频数据的超时
}

type ClientSessionType

type ClientSessionType int
const (
	CSTPullSession ClientSessionType = iota
	CSTPushSession
)

type HandshakeClient

type HandshakeClient interface {
	WriteC0C1(writer io.Writer) error
	ReadS0S1S2(reader io.Reader) error
	WriteC2(writer io.Writer) error
}

type HandshakeClientComplex added in v0.1.0

type HandshakeClientComplex struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientComplex) ReadS0S1S2 added in v0.1.0

func (c *HandshakeClientComplex) ReadS0S1S2(reader io.Reader) error

func (*HandshakeClientComplex) WriteC0C1 added in v0.1.0

func (c *HandshakeClientComplex) WriteC0C1(writer io.Writer) error

func (*HandshakeClientComplex) WriteC2 added in v0.1.0

func (c *HandshakeClientComplex) WriteC2(write io.Writer) error

type HandshakeClientSimple added in v0.1.0

type HandshakeClientSimple struct {
	// contains filtered or unexported fields
}

func (*HandshakeClientSimple) ReadS0S1S2 added in v0.1.0

func (c *HandshakeClientSimple) ReadS0S1S2(reader io.Reader) error

func (*HandshakeClientSimple) WriteC0C1 added in v0.1.0

func (c *HandshakeClientSimple) WriteC0C1(writer io.Writer) error

func (*HandshakeClientSimple) WriteC2 added in v0.1.0

func (c *HandshakeClientSimple) WriteC2(write io.Writer) error

type HandshakeServer

type HandshakeServer struct {
	// contains filtered or unexported fields
}

func (*HandshakeServer) ReadC0C1

func (s *HandshakeServer) ReadC0C1(reader io.Reader) (err error)

func (*HandshakeServer) ReadC2

func (s *HandshakeServer) ReadC2(reader io.Reader) error

func (*HandshakeServer) WriteS0S1S2

func (s *HandshakeServer) WriteS0S1S2(write io.Writer) error
type Header struct {
	CSID        int
	MsgLen      uint32 // 不包含header的大小
	Timestamp   uint32 // NOTICE 是 rtmp 协议 header 中的时间戳,可能是绝对的,也可能是相对的。上层不应该使用这个字段,而应该使用TimestampAbs
	MsgTypeID   uint8  // 8 audio 9 video 18 metadata
	MsgStreamID int

	TimestampAbs uint32 // 经过计算得到的流上的绝对时间戳
}

TODO chef: 将Timestamp字段隐藏,不对外暴露

type MessagePacker

type MessagePacker struct {
	// contains filtered or unexported fields
}

func NewMessagePacker

func NewMessagePacker() *MessagePacker

type ModClientSessionOption added in v0.5.0

type ModClientSessionOption func(option *ClientSessionOption)

type ModPullSessionOption added in v0.5.0

type ModPullSessionOption func(option *PullSessionOption)

type ModPushSessionOption added in v0.5.0

type ModPushSessionOption func(option *PushSessionOption)

type ObjectPair

type ObjectPair struct {
	Key   string
	Value interface{}
}

type ObjectPairArray added in v0.12.0

type ObjectPairArray []ObjectPair

func ParseMetadata added in v0.12.0

func ParseMetadata(b []byte) (ObjectPairArray, error)

func (ObjectPairArray) Find added in v0.12.0

func (o ObjectPairArray) Find(key string) interface{}

func (ObjectPairArray) FindString added in v0.12.0

func (o ObjectPairArray) FindString(key string) (string, error)

type OnCompleteMessage added in v0.13.0

type OnCompleteMessage func(stream *Stream) error

type OnReadRTMPAVMsg added in v0.5.0

type OnReadRTMPAVMsg func(msg AVMsg)

type PubSessionObserver

type PubSessionObserver interface {
	// 注意,回调结束后,内部会复用Payload内存块
	OnReadRTMPAVMsg(msg AVMsg)
}

type PullSession

type PullSession struct {
	// contains filtered or unexported fields
}

func NewPullSession

func NewPullSession(modOptions ...ModPullSessionOption) *PullSession

func (*PullSession) Dispose added in v0.5.0

func (s *PullSession) Dispose()

func (*PullSession) Done added in v0.12.0

func (s *PullSession) Done() <-chan error

func (*PullSession) Pull

func (s *PullSession) Pull(rawURL string, onReadRTMPAVMsg OnReadRTMPAVMsg) error

建立rtmp play连接 阻塞直到收到服务端返回的rtmp publish对应结果的信令,或发生错误

@param onReadRTMPAVMsg: 注意,回调结束后,内存块会被PullSession重复使用

func (*PullSession) UniqueKey added in v0.12.0

func (s *PullSession) UniqueKey() string

type PullSessionOption added in v0.5.0

type PullSessionOption struct {
	ConnectTimeoutMS int
	PullTimeoutMS    int
	ReadAVTimeoutMS  int
}

type PushSession

type PushSession struct {
	IsFresh bool
	// contains filtered or unexported fields
}

func NewPushSession

func NewPushSession(modOptions ...ModPushSessionOption) *PushSession

func (*PushSession) AsyncWrite added in v0.5.0

func (s *PushSession) AsyncWrite(msg []byte) error

func (*PushSession) Dispose added in v0.5.0

func (s *PushSession) Dispose()

func (*PushSession) Done added in v0.11.0

func (s *PushSession) Done() <-chan error

func (*PushSession) Flush added in v0.5.0

func (s *PushSession) Flush() error

func (*PushSession) Push

func (s *PushSession) Push(rawURL string) error

建立rtmp publish连接 阻塞直到收到服务端返回的rtmp publish对应结果的信令,或发生错误

func (*PushSession) UniqueKey added in v0.11.0

func (s *PushSession) UniqueKey() string

type PushSessionOption added in v0.5.0

type PushSessionOption struct {
	ConnectTimeoutMS int
	PushTimeoutMS    int
	WriteAVTimeoutMS int
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(obs ServerObserver, addr string) *Server

func (*Server) Dispose

func (server *Server) Dispose()

func (*Server) Listen added in v0.10.0

func (server *Server) Listen() (err error)

func (*Server) OnNewRTMPPubSession added in v0.13.0

func (server *Server) OnNewRTMPPubSession(session *ServerSession)

ServerSessionObserver

func (*Server) OnNewRTMPSubSession added in v0.13.0

func (server *Server) OnNewRTMPSubSession(session *ServerSession)

ServerSessionObserver

func (*Server) RunLoop

func (server *Server) RunLoop() error

type ServerObserver

type ServerObserver interface {
	OnNewRTMPPubSession(session *ServerSession) bool // 返回true则允许推流,返回false则强制关闭这个连接
	OnDelRTMPPubSession(session *ServerSession)
	OnNewRTMPSubSession(session *ServerSession) bool // 返回true则允许拉流,返回false则强制关闭这个连接
	OnDelRTMPSubSession(session *ServerSession)
}

type ServerSession

type ServerSession struct {
	UniqueKey              string
	AppName                string
	StreamName             string
	StreamNameWithRawQuery string

	// only for SubSession
	IsFresh bool
	// contains filtered or unexported fields
}

func NewServerSession

func NewServerSession(obs ServerSessionObserver, conn net.Conn) *ServerSession

func (*ServerSession) AsyncWrite

func (s *ServerSession) AsyncWrite(msg []byte) error

func (*ServerSession) Dispose

func (s *ServerSession) Dispose()

func (*ServerSession) Flush added in v0.3.0

func (s *ServerSession) Flush() error

func (*ServerSession) ModConnProps added in v0.3.0

func (s *ServerSession) ModConnProps()

func (*ServerSession) RunLoop

func (s *ServerSession) RunLoop() (err error)

func (*ServerSession) SetPubSessionObserver

func (s *ServerSession) SetPubSessionObserver(obs PubSessionObserver)

type ServerSessionObserver

type ServerSessionObserver interface {
	OnNewRTMPPubSession(session *ServerSession) // 上层代码应该在这个事件回调中注册音视频数据的监听
	OnNewRTMPSubSession(session *ServerSession)
}

type ServerSessionType

type ServerSessionType int
const (
	ServerSessionTypeUnknown ServerSessionType = iota // 收到客户端的publish或者play信令之前的类型状态
	ServerSessionTypePub
	ServerSessionTypeSub
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

func NewStream

func NewStream() *Stream

type StreamMsg

type StreamMsg struct {
	// contains filtered or unexported fields
}

TODO chef: 将这个buffer实现和bytes.Buffer做比较,考虑将它放入naza package中

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL