stream

package
v0.0.0-...-4e62e7e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HookEventPublish        = HookEvent(0x1)
	HookEventPublishDone    = HookEvent(0x2)
	HookEventPlay           = HookEvent(0x3)
	HookEventPlayDone       = HookEvent(0x4)
	HookEventRecord         = HookEvent(0x5)
	HookEventIdleTimeout    = HookEvent(0x6)
	HookEventReceiveTimeout = HookEvent(0x7)
	HookEventStarted        = HookEvent(0x8)
)
View Source
const (
	ReceiveBufferUdpBlockCount = 300

	ReceiveBufferTCPBlockCount = 50
)
View Source
const (
	SourceTypeRtmp  = SourceType(1)
	SourceType28181 = SourceType(2)
	SourceType1078  = SourceType(3)

	TransStreamRtmp            = TransStreamProtocol(1)
	TransStreamFlv             = TransStreamProtocol(2)
	TransStreamRtsp            = TransStreamProtocol(3)
	TransStreamHls             = TransStreamProtocol(4)
	TransStreamRtc             = TransStreamProtocol(5)
	TransStreamGBStreamForward = TransStreamProtocol(6) // 国标级联转发
)
View Source
const (
	SessionStateCreated          = SessionState(1) // 新建状态
	SessionStateHandshaking      = SessionState(2) // 握手中
	SessionStateHandshakeFailure = SessionState(3) // 握手失败
	SessionStateHandshakeSuccess = SessionState(4) // 握手完成
	SessionStateWaiting          = SessionState(5) // 位于等待队列中
	SessionStateTransferring     = SessionState(6) // 推拉流中
	SessionStateClosed           = SessionState(7) // 关闭状态
)
View Source
const (
	DefaultMergeWriteLatency = 350
)

Variables

View Source
var SinkManager *sinkManager

SinkManager 目前只用于保存HLS拉流Sink

View Source
var SourceManager *sourceManger

SourceManager 全局管理所有推流源

View Source
var (
	StreamEndInfoBride func(s Source) *StreamEndInfo
)

Functions

func AddSinkToWaitingQueue

func AddSinkToWaitingQueue(streamId string, sink Sink)

func CreateSinkDisconnectionMessage

func CreateSinkDisconnectionMessage(sink Sink) string

func DumpStream2File

func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte)

DumpStream2File 保存推流到文件, 用4字节帧长分割

func EqualsTracks

func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool

func ExistSink

func ExistSink(sourceId string, sinkId SinkID) bool

func ExistSinkInWaitingQueue

func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool

func ExistSourceInWaitingQueue

func ExistSourceInWaitingQueue(id string) bool

func ExtractAudioPacket

func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error)

func ExtractVideoPacket

func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, index, timebase int) (utils.AVStream, utils.AVPacket, error)

func GetStreamPlayUrls

func GetStreamPlayUrls(source string) []string

func Hook

func Hook(event HookEvent, params string, body interface{}) (*http.Response, error)

func HookIdleTimeoutEvent

func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState)

func HookPlayDoneEvent

func HookPlayDoneEvent(sink Sink) (*http.Response, bool)

func HookPublishDoneEvent

func HookPublishDoneEvent(source Source)

func HookPublishEvent

func HookPublishEvent(source Source) (*http.Response, utils.HookState)

func HookReceiveTimeoutEvent

func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState)

func HookRecordEvent

func HookRecordEvent(source Source, path string)

func InitHookUrls

func InitHookUrls()

func IsSupportMux

func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool

func JoinHostPort

func JoinHostPort(host string, port int) string

func ListenAddr

func ListenAddr(port int) string

func LoopEvent

func LoopEvent(source Source)

LoopEvent 循环读取事件

func NewHookPlayEventInfo

func NewHookPlayEventInfo(sink Sink) eventInfo

func NewHookPublishEventInfo

func NewHookPublishEventInfo(source Source) eventInfo

func NewRecordEventInfo

func NewRecordEventInfo(source Source, path string) interface{}

func ParseUrl

func ParseUrl(name string) (string, url.Values)

ParseUrl 从推拉流url中解析出流id和url参数

func Path2SourceId

func Path2SourceId(path string, suffix string) (string, error)

func PreparePlaySink

func PreparePlaySink(sink Sink) (*http.Response, utils.HookState)

func PreparePlaySinkWithReady

func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookState)

func PreparePublishSource

func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState)

func RegisterTransStreamFactory

func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory)

func SendHookEvent

func SendHookEvent(url string, body []byte) (*http.Response, error)

func SetDefaultConfig

func SetDefaultConfig(config *AppConfig_)

func SetRecordStreamFactory

func SetRecordStreamFactory(factory RecordStreamFactory)

func SinkId2String

func SinkId2String(id SinkID) string

func StartIdleTimer

func StartIdleTimer(source Source) *time.Timer

StartIdleTimer 启动拉流空闲计时器

func StartReceiveDataTimer

func StartReceiveDataTimer(source Source) *time.Timer

StartReceiveDataTimer 启动收流超时计时器

Types

type AppConfig_

type AppConfig_ struct {
	GOPCache            bool   `json:"gop_cache"`       // 是否开启GOP缓存,只缓存一组音视频
	GOPBufferSize       int    `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小
	ProbeTimeout        int    `json:"probe_timeout"`   // 收流解析AVStream的超时时间
	WriteTimeout        int    `json:"write_timeout"`   // Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本.
	WriteBufferCapacity int    `json:"-"`               // 发送缓冲区容量大小, 缓冲区由多个内存块构成.
	PublicIP            string `json:"public_ip"`
	ListenIP            string `json:"listen_ip"`
	IdleTimeout         int64  `json:"idle_timeout"`    // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
	ReceiveTimeout      int64  `json:"receive_timeout"` // 多长时间(单位秒)没有收到流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source.
	Debug               bool   `json:"debug"`           // debug模式, 开启将保存推流

	//缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能.
	//合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例.
	MergeWriteLatency int `json:"mw_latency"`
	Rtmp              RtmpConfig
	Hls               HlsConfig
	JT1078            JT1078Config
	Rtsp              RtspConfig
	GB28181           GB28181Config
	WebRtc            WebRtcConfig

	Hooks  HooksConfig
	Record RecordConfig
	Log    LogConfig
	Http   HttpConfig
}

AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写.

var AppConfig AppConfig_

func LoadConfigFile

func LoadConfigFile(path string) (*AppConfig_, error)

type BaseSink

type BaseSink struct {
	ID            SinkID
	SourceID      string
	Protocol      TransStreamProtocol
	State         SessionState
	TransStreamID TransStreamID

	DesiredAudioCodecId_ utils.AVCodecID
	DesiredVideoCodecId_ utils.AVCodecID

	Conn         net.Conn // 拉流信令链路
	TCPStreaming bool     // 是否是TCP流式拉流

	SentPacketCount int  // 发包计数
	Ready           bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流.
	// contains filtered or unexported fields
}

func (*BaseSink) Close

func (s *BaseSink) Close()

Close 做如下事情: 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink. 2. 发送PlayDoneHook事件

func (*BaseSink) CreateTime

func (s *BaseSink) CreateTime() time.Time

func (*BaseSink) DesiredAudioCodecId

func (s *BaseSink) DesiredAudioCodecId() utils.AVCodecID

func (*BaseSink) DesiredVideoCodecId

func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID

func (*BaseSink) EnableVideo

func (s *BaseSink) EnableVideo() bool

func (*BaseSink) GetConn

func (s *BaseSink) GetConn() net.Conn

func (*BaseSink) GetID

func (s *BaseSink) GetID() SinkID

func (*BaseSink) GetProtocol

func (s *BaseSink) GetProtocol() TransStreamProtocol

func (*BaseSink) GetSentPacketCount

func (s *BaseSink) GetSentPacketCount() int

func (*BaseSink) GetSourceID

func (s *BaseSink) GetSourceID() string

func (*BaseSink) GetState

func (s *BaseSink) GetState() SessionState

func (*BaseSink) GetTransStreamID

func (s *BaseSink) GetTransStreamID() TransStreamID

func (*BaseSink) IncreaseSentPacketCount

func (s *BaseSink) IncreaseSentPacketCount()

func (*BaseSink) IsReady

func (s *BaseSink) IsReady() bool

func (*BaseSink) IsTCPStreaming

func (s *BaseSink) IsTCPStreaming() bool

func (*BaseSink) Lock

func (s *BaseSink) Lock()

func (*BaseSink) RemoteAddr

func (s *BaseSink) RemoteAddr() string

func (*BaseSink) SetCreateTime

func (s *BaseSink) SetCreateTime(time time.Time)

func (*BaseSink) SetEnableVideo

func (s *BaseSink) SetEnableVideo(enable bool)

func (*BaseSink) SetID

func (s *BaseSink) SetID(id SinkID)

func (*BaseSink) SetReady

func (s *BaseSink) SetReady(ok bool)

func (*BaseSink) SetSentPacketCount

func (s *BaseSink) SetSentPacketCount(count int)

func (*BaseSink) SetState

func (s *BaseSink) SetState(state SessionState)

func (*BaseSink) SetTransStreamID

func (s *BaseSink) SetTransStreamID(id TransStreamID)

func (*BaseSink) SetUrlValues

func (s *BaseSink) SetUrlValues(values url.Values)

func (*BaseSink) StartStreaming

func (s *BaseSink) StartStreaming(stream TransStream) error

func (*BaseSink) StopStreaming

func (s *BaseSink) StopStreaming(stream TransStream)

func (*BaseSink) String

func (s *BaseSink) String() string

func (*BaseSink) UnLock

func (s *BaseSink) UnLock()

func (*BaseSink) UrlValues

func (s *BaseSink) UrlValues() url.Values

func (*BaseSink) Write

func (s *BaseSink) Write(index int, data [][]byte, ts int64) error

type BaseTransStream

type BaseTransStream struct {
	ID         TransStreamID
	Tracks     []*Track
	Completed  bool
	ExistVideo bool
	Protocol   TransStreamProtocol

	OutBuffer     [][]byte // 传输流的合并写块队列
	OutBufferSize int      // 传输流返合并写块队列大小
}

func (*BaseTransStream) AddTrack

func (t *BaseTransStream) AddTrack(track *Track) error

func (*BaseTransStream) AppendOutStreamBuffer

func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte)

func (*BaseTransStream) ClearOutStreamBuffer

func (t *BaseTransStream) ClearOutStreamBuffer()

func (*BaseTransStream) Close

func (t *BaseTransStream) Close() ([][]byte, int64, error)

func (*BaseTransStream) GetID

func (t *BaseTransStream) GetID() TransStreamID

func (*BaseTransStream) GetProtocol

func (t *BaseTransStream) GetProtocol() TransStreamProtocol

func (*BaseTransStream) GetTracks

func (t *BaseTransStream) GetTracks() []*Track

func (*BaseTransStream) Input

func (t *BaseTransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error)

func (*BaseTransStream) IsExistVideo

func (t *BaseTransStream) IsExistVideo() bool

func (*BaseTransStream) OutStreamBufferCapacity

func (t *BaseTransStream) OutStreamBufferCapacity() int

func (*BaseTransStream) ReadExtraData

func (t *BaseTransStream) ReadExtraData(timestamp int64) ([][]byte, int64, error)

func (*BaseTransStream) ReadKeyFrameBuffer

func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error)

func (*BaseTransStream) SetID

func (t *BaseTransStream) SetID(id TransStreamID)

func (*BaseTransStream) SetProtocol

func (t *BaseTransStream) SetProtocol(protocol TransStreamProtocol)

func (*BaseTransStream) TrackSize

func (t *BaseTransStream) TrackSize() int

type BitrateStatistics

type BitrateStatistics struct {
	// contains filtered or unexported fields
}

BitrateStatistics 码流统计, 单位Byte

func NewBitrateStatistics

func NewBitrateStatistics() *BitrateStatistics

func (*BitrateStatistics) Average

func (b *BitrateStatistics) Average() int

Average 返回每秒平均码流大小

func (*BitrateStatistics) Input

func (b *BitrateStatistics) Input(size int)

func (*BitrateStatistics) PreviousSecond

func (b *BitrateStatistics) PreviousSecond() int

PreviousSecond 返回前一秒的码流大小

func (*BitrateStatistics) Total

func (b *BitrateStatistics) Total() int64

Total 返回总码流大小

type EnableConfig

type EnableConfig interface {
	IsEnable() bool

	SetEnable(bool)
}

type GB28181Config

type GB28181Config struct {
	TransportConfig
	Port []int `json:"port"`
	// contains filtered or unexported fields
}

func (*GB28181Config) IsEnable

func (e *GB28181Config) IsEnable() bool

func (GB28181Config) IsMultiPort

func (g GB28181Config) IsMultiPort() bool

func (*GB28181Config) SetEnable

func (e *GB28181Config) SetEnable(b bool)

type GOPBuffer

type GOPBuffer interface {

	// AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败
	AddPacket(packet utils.AVPacket) bool

	// SetDiscardHandler 设置丢弃帧时的回调
	SetDiscardHandler(handler func(packet utils.AVPacket))

	PeekAll(handler func(packet utils.AVPacket))

	Peek(index int) utils.AVPacket

	Size() int

	Clear()

	Close()
}

GOPBuffer GOP缓存

func NewStreamBuffer

func NewStreamBuffer() GOPBuffer

type HlsConfig

type HlsConfig struct {
	Dir            string `json:"dir"`
	Duration       int    `json:"segment_duration"`
	PlaylistLength int    `json:"playlist_length"`
	// contains filtered or unexported fields
}

func (*HlsConfig) IsEnable

func (e *HlsConfig) IsEnable() bool

func (HlsConfig) M3U8Dir

func (c HlsConfig) M3U8Dir(sourceId string) string

M3U8Dir 根据id返回m3u8文件位于磁盘中的绝对目录

func (HlsConfig) M3U8Format

func (c HlsConfig) M3U8Format(sourceId string) string

M3U8Format 根据id返回m3u8文件名

func (HlsConfig) M3U8Path

func (c HlsConfig) M3U8Path(sourceId string) string

M3U8Path 根据sourceId返回m3u8的磁盘路径 切片及目录生成规则, 以SourceId为34020000001320000001/34020000001320000001为例: 创建文件夹34020000001320000001, 34020000001320000001.m3u8文件, 文件列表中切片url为34020000001320000001_seq.ts

func (*HlsConfig) SetEnable

func (e *HlsConfig) SetEnable(b bool)

func (HlsConfig) TSFormat

func (c HlsConfig) TSFormat(sourceId string) string

TSFormat 根据id返回ts文件名

func (HlsConfig) TSPath

func (c HlsConfig) TSPath(sourceId string, tsSeq string) string

TSPath 根据sourceId和ts文件名返回ts的磁盘绝对路径

type HookEvent

type HookEvent int

func (*HookEvent) ToString

func (h *HookEvent) ToString() string

type HooksConfig

type HooksConfig struct {
	Timeout             int64  `json:"timeout"`
	OnStartedUrl        string `json:"on_started"`         //应用启动后回调
	OnPublishUrl        string `json:"on_publish"`         //推流回调
	OnPublishDoneUrl    string `json:"on_publish_done"`    //推流结束回调
	OnPlayUrl           string `json:"on_play"`            //拉流回调
	OnPlayDoneUrl       string `json:"on_play_done"`       //拉流结束回调
	OnRecordUrl         string `json:"on_record"`          //录制流回调
	OnIdleTimeoutUrl    string `json:"on_idle_timeout"`    //没有sink拉流回调
	OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //没有推流回调
	// contains filtered or unexported fields
}

func (*HooksConfig) IsEnable

func (e *HooksConfig) IsEnable() bool

func (*HooksConfig) IsEnableOnIdleTimeout

func (hook *HooksConfig) IsEnableOnIdleTimeout() bool

func (*HooksConfig) IsEnableOnPlay

func (hook *HooksConfig) IsEnableOnPlay() bool

func (*HooksConfig) IsEnableOnPlayDone

func (hook *HooksConfig) IsEnableOnPlayDone() bool

func (*HooksConfig) IsEnableOnPublishDone

func (hook *HooksConfig) IsEnableOnPublishDone() bool

func (*HooksConfig) IsEnableOnReceiveTimeout

func (hook *HooksConfig) IsEnableOnReceiveTimeout() bool

func (*HooksConfig) IsEnableOnRecord

func (hook *HooksConfig) IsEnableOnRecord() bool

func (*HooksConfig) IsEnableOnStarted

func (hook *HooksConfig) IsEnableOnStarted() bool

func (*HooksConfig) IsEnablePublishEvent

func (hook *HooksConfig) IsEnablePublishEvent() bool

func (*HooksConfig) SetEnable

func (e *HooksConfig) SetEnable(b bool)

type HttpConfig

type HttpConfig struct {
	Port int `json:"port"`
}

type IPV4SinkID

type IPV4SinkID uint64

type IPV6SinkID

type IPV6SinkID string

type JT1078Config

type JT1078Config struct {
	// contains filtered or unexported fields
}

func (*JT1078Config) GetPort

func (s *JT1078Config) GetPort() int

func (*JT1078Config) IsEnable

func (e *JT1078Config) IsEnable() bool

func (*JT1078Config) SetEnable

func (e *JT1078Config) SetEnable(b bool)

func (*JT1078Config) SetPort

func (s *JT1078Config) SetPort(port int)

type JitterBuffer

type JitterBuffer struct {
	// contains filtered or unexported fields
}

JitterBuffer 只处理乱序的JitterBuffer

func NewJitterBuffer

func NewJitterBuffer() *JitterBuffer

func (*JitterBuffer) Flush

func (j *JitterBuffer) Flush()

func (*JitterBuffer) Push

func (j *JitterBuffer) Push(seq uint16, packet interface{})

func (*JitterBuffer) SetHandler

func (j *JitterBuffer) SetHandler(handler func(packet interface{}))

type LogConfig

type LogConfig struct {
	FileLogging bool   `json:"file_logging"`
	Level       int    `json:"level"`
	Name        string `json:"name"`
	MaxSize     int    `json:"max_size"` //单位M
	MaxBackup   int    `json:"max_backup"`
	MaxAge      int    `json:"max_age"` //天数
	Compress    bool   `json:"compress"`
}

type MergeWritingBuffer

type MergeWritingBuffer interface {
	Allocate(size int, ts int64, videoKey bool) []byte

	// PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 未满返回nil.
	PeekCompletedSegment() ([]byte, bool)

	// FlushSegment 生成并返回当前切片, 以及是否是关键帧切片.
	FlushSegment() ([]byte, bool)

	// IsFull 当前切片已满
	IsFull(ts int64) bool

	// IsNewSegment 当前切片是否还未写数据
	IsNewSegment() bool

	// Reserve 从当前切片中预留指定长度数据
	Reserve(length int)

	// ReadSegmentsFromKeyFrameIndex 返回最近的关键帧切片
	ReadSegmentsFromKeyFrameIndex(cb func([]byte))

	Capacity() int
}

MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存 包含多个合并写块, 循环使用, 至少需要等到第二个I帧才开始循环. webrtcI帧间隔可能会高达几十秒, 容量根据write_timeout发送超时和合并写时间来计算, write_timeout/mw_latency.如果I帧间隔大于发送超时时间, 则需要创建新的块.

func NewMergeWritingBuffer

func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer

type PortConfig

type PortConfig interface {
	GetPort() int

	SetPort(port int)
}

type PublishSource

type PublishSource struct {
	ID   string
	Type SourceType

	Conn net.Conn

	TransDeMuxer stream.DeMuxer // 负责从推流协议中解析出AVStream和AVPacket

	TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink

	TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink
	// contains filtered or unexported fields
}

func (*PublishSource) AddSink

func (s *PublishSource) AddSink(sink Sink)

func (*PublishSource) Close

func (s *PublishSource) Close()

func (*PublishSource) CorrectTimestamp

func (s *PublishSource) CorrectTimestamp(packet utils.AVPacket)

func (*PublishSource) CreateDefaultOutStreams

func (s *PublishSource) CreateDefaultOutStreams()

func (*PublishSource) CreateTime

func (s *PublishSource) CreateTime() time.Time

func (*PublishSource) CreateTransStream

func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)

func (*PublishSource) DispatchBuffer

func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool)

DispatchBuffer 分发传输流

func (*PublishSource) DispatchGOPBuffer

func (s *PublishSource) DispatchGOPBuffer(transStream TransStream)

func (*PublishSource) DispatchPacket

func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket)

DispatchPacket 分发AVPacket

func (*PublishSource) DoClose

func (s *PublishSource) DoClose()

func (*PublishSource) FindOrCreatePacketBuffer

func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool

FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池

func (*PublishSource) FindSink

func (s *PublishSource) FindSink(id SinkID) Sink

func (*PublishSource) GetBitrateStatistics

func (s *PublishSource) GetBitrateStatistics() *BitrateStatistics

func (*PublishSource) GetID

func (s *PublishSource) GetID() string

func (*PublishSource) GetStreamEndInfo

func (s *PublishSource) GetStreamEndInfo() *StreamEndInfo

func (*PublishSource) GetTransStreams

func (s *PublishSource) GetTransStreams() map[TransStreamID]TransStream

func (*PublishSource) GetType

func (s *PublishSource) GetType() SourceType

func (*PublishSource) Init

func (s *PublishSource) Init(receiveQueueSize int)

func (*PublishSource) Input

func (s *PublishSource) Input(data []byte) error

func (*PublishSource) IsClosed

func (s *PublishSource) IsClosed() bool

func (*PublishSource) IsCompleted

func (s *PublishSource) IsCompleted() bool

func (*PublishSource) LastPacketTime

func (s *PublishSource) LastPacketTime() time.Time

func (*PublishSource) LastStreamEndTime

func (s *PublishSource) LastStreamEndTime() time.Time

func (*PublishSource) MainContextEvents

func (s *PublishSource) MainContextEvents() chan func()

func (*PublishSource) NotTrackAdded

func (s *PublishSource) NotTrackAdded(index int) bool

NotTrackAdded 返回该index对应的track是否没有添加

func (*PublishSource) OnDeMuxDone

func (s *PublishSource) OnDeMuxDone()

func (*PublishSource) OnDeMuxPacket

func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket)

func (*PublishSource) OnDeMuxStream

func (s *PublishSource) OnDeMuxStream(stream utils.AVStream)

func (*PublishSource) OnDeMuxStreamDone

func (s *PublishSource) OnDeMuxStreamDone()

func (*PublishSource) OnDiscardPacket

func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket)

OnDiscardPacket GOP缓存溢出丢弃回调

func (*PublishSource) OriginTracks

func (s *PublishSource) OriginTracks() []*Track

func (*PublishSource) PostEvent

func (s *PublishSource) PostEvent(cb func())

func (*PublishSource) RemoteAddr

func (s *PublishSource) RemoteAddr() string

func (*PublishSource) RemoveSink

func (s *PublishSource) RemoveSink(sink Sink)

func (*PublishSource) RemoveSinkWithID

func (s *PublishSource) RemoveSinkWithID(id SinkID)

func (*PublishSource) SetCreateTime

func (s *PublishSource) SetCreateTime(time time.Time)

func (*PublishSource) SetID

func (s *PublishSource) SetID(id string)

func (*PublishSource) SetLastPacketTime

func (s *PublishSource) SetLastPacketTime(time2 time.Time)

func (*PublishSource) SetState

func (s *PublishSource) SetState(state SessionState)

func (*PublishSource) SetType

func (s *PublishSource) SetType(sourceType SourceType)

func (*PublishSource) SetUrlValues

func (s *PublishSource) SetUrlValues(values url.Values)

func (*PublishSource) SinkCount

func (s *PublishSource) SinkCount() int

func (*PublishSource) Sinks

func (s *PublishSource) Sinks() []Sink

func (*PublishSource) State

func (s *PublishSource) State() SessionState

func (*PublishSource) StreamPipe

func (s *PublishSource) StreamPipe() chan []byte

func (*PublishSource) String

func (s *PublishSource) String() string

func (*PublishSource) TranscodeTracks

func (s *PublishSource) TranscodeTracks() []*Track

func (*PublishSource) UrlValues

func (s *PublishSource) UrlValues() url.Values

type ReceiveBuffer

type ReceiveBuffer struct {
	// contains filtered or unexported fields
}

ReceiveBuffer 收流缓冲区. 网络收流->解析流->封装流->发送流是同步的,从解析到发送可能耗时,从而影响读取网络流. 使用收流缓冲区,可有效降低出现此问题的概率. 从网络IO读取数据->送给解复用器, 此过程需做到无内存拷贝 rtmp和1078推流直接使用ReceiveBuffer 国标推流,UDP收流都要经过jitter buffer处理, 还是需要拷贝一次, 没必要使用ReceiveBuffer. TCP全都使用ReceiveBuffer, 区别在于多端口模式, 第一包传给source, 单端口模式先解析出ssrc, 找到source. 后续再传给source.

func NewReceiveBuffer

func NewReceiveBuffer(blockSize, blockCount int) *ReceiveBuffer

func NewTCPReceiveBuffer

func NewTCPReceiveBuffer() *ReceiveBuffer

func NewUDPReceiveBuffer

func NewUDPReceiveBuffer() *ReceiveBuffer

func (*ReceiveBuffer) BlockCount

func (r *ReceiveBuffer) BlockCount() int

func (*ReceiveBuffer) Get

func (r *ReceiveBuffer) Get(index int) []byte

func (*ReceiveBuffer) GetBlock

func (r *ReceiveBuffer) GetBlock() []byte

func (*ReceiveBuffer) Index

func (r *ReceiveBuffer) Index() int

type RecordConfig

type RecordConfig struct {
	Format string `json:"format"`
	Dir    string `json:"dir"`
	// contains filtered or unexported fields
}

func (*RecordConfig) IsEnable

func (e *RecordConfig) IsEnable() bool

func (*RecordConfig) SetEnable

func (e *RecordConfig) SetEnable(b bool)

type RecordStreamFactory

type RecordStreamFactory func(source string) (Sink, string, error)

type RtmpConfig

type RtmpConfig struct {
	// contains filtered or unexported fields
}

func (*RtmpConfig) GetPort

func (s *RtmpConfig) GetPort() int

func (*RtmpConfig) IsEnable

func (e *RtmpConfig) IsEnable() bool

func (*RtmpConfig) SetEnable

func (e *RtmpConfig) SetEnable(b bool)

func (*RtmpConfig) SetPort

func (s *RtmpConfig) SetPort(port int)

type RtspConfig

type RtspConfig struct {
	TransportConfig

	Port     []int  `json:"port"`
	Password string `json:"password"`
	// contains filtered or unexported fields
}

func (*RtspConfig) IsEnable

func (e *RtspConfig) IsEnable() bool

func (RtspConfig) IsMultiPort

func (g RtspConfig) IsMultiPort() bool

func (*RtspConfig) SetEnable

func (e *RtspConfig) SetEnable(b bool)

type SessionHandler

type SessionHandler[T any] interface {
	OnNewSession(conn net.Conn) T

	OnCloseSession(session T)
}

type SessionState

type SessionState uint32

SessionState 推拉流Session的状态 包含握手和Hook授权阶段

func (SessionState) String

func (s SessionState) String() string

type Sink

type Sink interface {
	GetID() SinkID

	SetID(sink SinkID)

	GetSourceID() string

	Write(index int, data [][]byte, ts int64) error

	GetTransStreamID() TransStreamID

	SetTransStreamID(id TransStreamID)

	GetProtocol() TransStreamProtocol

	// GetState 获取Sink状态, 调用前外部必须手动加锁
	GetState() SessionState

	// SetState 设置Sink状态, 调用前外部必须手动加锁
	SetState(state SessionState)

	EnableVideo() bool

	// SetEnableVideo 设置是否拉取视频流, 允许客户端只拉取音频流
	SetEnableVideo(enable bool)

	// DesiredAudioCodecId 允许客户端拉取指定的音频流
	DesiredAudioCodecId() utils.AVCodecID

	// DesiredVideoCodecId DescribeVideoCodecId 允许客户端拉取指定的视频流
	DesiredVideoCodecId() utils.AVCodecID

	// Close 关闭释放Sink, 从传输流或等待队列中删除sink
	Close()

	String() string

	RemoteAddr() string

	// Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全
	// 如果Sink在等待队列-Sink断开, 这个过程是非线程安全的
	// 所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护.
	Lock()

	UnLock()

	UrlValues() url.Values

	SetUrlValues(values url.Values)

	// StartStreaming Source向Sink开始推流时调用
	StartStreaming(stream TransStream) error

	// StopStreaming Source向Sink停止推流时调用
	StopStreaming(stream TransStream)

	GetConn() net.Conn

	IsTCPStreaming() bool

	GetSentPacketCount() int

	SetSentPacketCount(int)

	IncreaseSentPacketCount()

	IsReady() bool

	SetReady(ok bool)

	CreateTime() time.Time

	SetCreateTime(time time.Time)
}

Sink 对拉流端的封装

func CreateRecordStream

func CreateRecordStream(sourceId string) (Sink, string, error)

func PopWaitingSinks

func PopWaitingSinks(sourceId string) []Sink

func RemoveSinkFromWaitingQueue

func RemoveSinkFromWaitingQueue(sourceId string, sinkId SinkID) (Sink, bool)

type SinkID

type SinkID interface{}

SinkID IPV4使用uint64、IPV6使用string作为ID类型

func NetAddr2SinkId

func NetAddr2SinkId(addr net.Addr) SinkID

NetAddr2SinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String

type Source

type Source interface {
	// GetID 返回SourceID
	GetID() string

	SetID(id string)

	// Input 输入推流数据
	Input(data []byte) error

	// GetType 返回推流类型
	GetType() SourceType

	SetType(sourceType SourceType)

	// OriginTracks 返回所有的推流track
	OriginTracks() []*Track

	// TranscodeTracks 返回所有的转码track
	TranscodeTracks() []*Track

	// AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader,先将Sink添加到等待队列.
	// 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink
	AddSink(sink Sink)

	// RemoveSink 删除Sink
	RemoveSink(sink Sink)

	RemoveSinkWithID(id SinkID)

	FindSink(id SinkID) Sink

	SetState(state SessionState)

	// Close 关闭Source
	// 关闭推流网络链路, 停止一切封装和转发流以及转码工作
	// 将Sink添加到等待队列
	Close()

	DoClose()

	// IsCompleted 所有推流track是否解析完毕
	IsCompleted() bool

	// FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
	FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool

	// OnDiscardPacket GOP缓存溢出回调, 释放AVPacket
	OnDiscardPacket(pkt utils.AVPacket)

	// OnDeMuxStream 解析出AVStream回调
	OnDeMuxStream(stream utils.AVStream)

	// OnDeMuxStreamDone 所有track解析完毕回调, 后续的OnDeMuxStream回调不再处理
	OnDeMuxStreamDone()

	// OnDeMuxPacket 解析出AvPacket回调
	OnDeMuxPacket(packet utils.AVPacket)

	// OnDeMuxDone 所有流解析完毕回调
	OnDeMuxDone()

	Init(receiveQueueSize int)

	RemoteAddr() string

	String() string

	State() SessionState

	// UrlValues 返回推流url参数
	UrlValues() url.Values

	// SetUrlValues 设置推流url参数
	SetUrlValues(values url.Values)

	// PostEvent 切换到主协程执行当前函数
	PostEvent(cb func())

	// LastPacketTime 返回最近收流时间戳
	LastPacketTime() time.Time

	SetLastPacketTime(time2 time.Time)

	// SinkCount 返回拉流计数
	SinkCount() int

	// LastStreamEndTime 返回最近结束拉流时间戳
	LastStreamEndTime() time.Time

	IsClosed() bool

	StreamPipe() chan []byte

	MainContextEvents() chan func()

	CreateTime() time.Time

	SetCreateTime(time time.Time)

	Sinks() []Sink

	GetBitrateStatistics() *BitrateStatistics

	GetTransStreams() map[TransStreamID]TransStream

	GetStreamEndInfo() *StreamEndInfo
}

Source 对推流源的封装

type SourceType

type SourceType byte

SourceType 推流类型

func (SourceType) String

func (s SourceType) String() string

type StreamEndInfo

type StreamEndInfo struct {
	ID             string
	Timestamps     map[utils.AVCodecID][2]int64 // 每路track结束时间戳
	M3U8Writer     libhls.M3U8Writer            // 保存M3U8生成器
	PlaylistFormat *string                      // M3U8播放列表
	RtspTracks     map[byte]uint16              // rtsp每路track的结束序号
	FLVPrevTagSize uint32                       // flv的最后一个tag大小, 下次生成flv时作为prev tag size
}

StreamEndInfo 保留结束推流Source的推流信息 在结束推流时,如果还有拉流端没有断开,则保留一些推流信息(目前有时间戳、ts切片序号等等)。在下次推流时,使用该时间戳作为新传输流的起始时间戳,保证拉流端在拉流时不会重现pts和dts错误. 如果在此之前,陆续有拉流端断开,直至sink计数为0,则会不再保留该信息。

type StreamEndInfoManager

type StreamEndInfoManager struct {
	// contains filtered or unexported fields
}

func (*StreamEndInfoManager) Add

func (s *StreamEndInfoManager) Add(history *StreamEndInfo)

func (*StreamEndInfoManager) Remove

func (s *StreamEndInfoManager) Remove(id string) *StreamEndInfo

type StreamServer

type StreamServer[T any] struct {
	SourceType SourceType
	Handler    SessionHandler[T]
}

func (*StreamServer[T]) OnConnected

func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte

func (*StreamServer[T]) OnDisConnected

func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error)

func (*StreamServer[T]) OnPacket

func (s *StreamServer[T]) OnPacket(conn net.Conn, data []byte) []byte

type TCPTransStream

type TCPTransStream struct {
	BaseTransStream

	// 合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存.
	// 起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后,mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性.
	// 看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694
	MWBuffer MergeWritingBuffer //合并写缓冲区, 同时作为用户态的发送缓冲区
}

func (*TCPTransStream) OutStreamBufferCapacity

func (t *TCPTransStream) OutStreamBufferCapacity() int

type Track

type Track struct {
	Stream        utils.AVStream
	Pts           int64 // 最新的PTS
	Dts           int64 // 最新的DTS
	FrameDuration int   // 单帧时长, timebase和推流一致
}

func NewTrack

func NewTrack(stream utils.AVStream, dts, pts int64) *Track

type TrackManager

type TrackManager struct {
	// contains filtered or unexported fields
}

func (*TrackManager) Add

func (s *TrackManager) Add(track *Track)

func (*TrackManager) All

func (s *TrackManager) All() []*Track

func (*TrackManager) Find

func (s *TrackManager) Find(id utils.AVCodecID) *Track

func (*TrackManager) FindTracks

func (s *TrackManager) FindTracks(id utils.AVCodecID) []*Track

func (*TrackManager) FindTracksWithType

func (s *TrackManager) FindTracksWithType(mediaType utils.AVMediaType) []*Track

func (*TrackManager) FindWithType

func (s *TrackManager) FindWithType(mediaType utils.AVMediaType) *Track

type TransStream

type TransStream interface {
	GetID() TransStreamID

	SetID(id TransStreamID)

	// Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧
	Input(packet utils.AVPacket) ([][]byte, int64, bool, error)

	AddTrack(track *Track) error

	TrackSize() int

	GetTracks() []*Track

	// WriteHeader track添加完毕, 通过调用此函数告知
	WriteHeader() error

	// GetProtocol 返回输出流协议
	GetProtocol() TransStreamProtocol

	SetProtocol(protocol TransStreamProtocol)

	// ReadExtraData 读取传输流的编码器扩展数据
	ReadExtraData(timestamp int64) ([][]byte, int64, error)

	// ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列
	ReadKeyFrameBuffer() ([][]byte, int64, error)

	// Close 关闭传输流, 返回还未flush的合并写块
	Close() ([][]byte, int64, error)

	// ClearOutStreamBuffer 清空传输流的合并写块队列
	ClearOutStreamBuffer()

	// AppendOutStreamBuffer 添加合并写块到队列
	AppendOutStreamBuffer(buffer []byte)

	// OutStreamBufferCapacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小;
	OutStreamBufferCapacity() int

	IsExistVideo() bool
}

TransStream 将AVPacket封装成传输流

func CreateTransStream

func CreateTransStream(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)

type TransStreamFactory

type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)

func FindTransStreamFactory

func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, error)

type TransStreamID

type TransStreamID uint64

TransStreamID 每个传输流的唯一Id,根据输出流协议ID+流包含的音视频编码器ID生成 输出流协议ID占用高8位 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流.

func GenerateTransStreamID

func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) TransStreamID

GenerateTransStreamID 根据输出流协议和输出流包含的音视频编码器ID生成流ID

type TransStreamProtocol

type TransStreamProtocol uint32

TransStreamProtocol 输出的流协议

func (TransStreamProtocol) String

func (p TransStreamProtocol) String() string

type TransportConfig

type TransportConfig struct {
	Transport string `json:"transport"` //"UDP|TCP"
}

func (TransportConfig) IsEnableTCP

func (g TransportConfig) IsEnableTCP() bool

func (TransportConfig) IsEnableUDP

func (g TransportConfig) IsEnableUDP() bool

type WebRtcConfig

type WebRtcConfig struct {
	TransportConfig
	// contains filtered or unexported fields
}

func (*WebRtcConfig) GetPort

func (s *WebRtcConfig) GetPort() int

func (*WebRtcConfig) IsEnable

func (e *WebRtcConfig) IsEnable() bool

func (*WebRtcConfig) SetEnable

func (e *WebRtcConfig) SetEnable(b bool)

func (*WebRtcConfig) SetPort

func (s *WebRtcConfig) SetPort(port int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL