Documentation ¶
Index ¶
- Constants
- Variables
- func AddSinkToWaitingQueue(streamId string, sink Sink)
- func CreateSinkDisconnectionMessage(sink Sink) string
- func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte)
- func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool
- func ExistSink(sourceId string, sinkId SinkID) bool
- func ExistSinkInWaitingQueue(sourceId string, sinkId SinkID) bool
- func ExistSourceInWaitingQueue(id string) bool
- func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, ...) (utils.AVStream, utils.AVPacket, error)
- func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, ...) (utils.AVStream, utils.AVPacket, error)
- func GetStreamPlayUrls(source string) []string
- func Hook(event HookEvent, params string, body interface{}) (*http.Response, error)
- func HookIdleTimeoutEvent(source Source) (*http.Response, utils.HookState)
- func HookPlayDoneEvent(sink Sink) (*http.Response, bool)
- func HookPublishDoneEvent(source Source)
- func HookPublishEvent(source Source) (*http.Response, utils.HookState)
- func HookReceiveTimeoutEvent(source Source) (*http.Response, utils.HookState)
- func HookRecordEvent(source Source, path string)
- func InitHookUrls()
- func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool
- func JoinHostPort(host string, port int) string
- func ListenAddr(port int) string
- func LoopEvent(source Source)
- func NewHookPlayEventInfo(sink Sink) eventInfo
- func NewHookPublishEventInfo(source Source) eventInfo
- func NewRecordEventInfo(source Source, path string) interface{}
- func ParseUrl(name string) (string, url.Values)
- func Path2SourceId(path string, suffix string) (string, error)
- func PreparePlaySink(sink Sink) (*http.Response, utils.HookState)
- func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookState)
- func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState)
- func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory)
- func SendHookEvent(url string, body []byte) (*http.Response, error)
- func SetDefaultConfig(config *AppConfig_)
- func SetRecordStreamFactory(factory RecordStreamFactory)
- func SinkId2String(id SinkID) string
- func StartIdleTimer(source Source) *time.Timer
- func StartReceiveDataTimer(source Source) *time.Timer
- type AppConfig_
- type BaseSink
- func (s *BaseSink) Close()
- func (s *BaseSink) CreateTime() time.Time
- func (s *BaseSink) DesiredAudioCodecId() utils.AVCodecID
- func (s *BaseSink) DesiredVideoCodecId() utils.AVCodecID
- func (s *BaseSink) EnableVideo() bool
- func (s *BaseSink) GetConn() net.Conn
- func (s *BaseSink) GetID() SinkID
- func (s *BaseSink) GetProtocol() TransStreamProtocol
- func (s *BaseSink) GetSentPacketCount() int
- func (s *BaseSink) GetSourceID() string
- func (s *BaseSink) GetState() SessionState
- func (s *BaseSink) GetTransStreamID() TransStreamID
- func (s *BaseSink) IncreaseSentPacketCount()
- func (s *BaseSink) IsReady() bool
- func (s *BaseSink) IsTCPStreaming() bool
- func (s *BaseSink) Lock()
- func (s *BaseSink) RemoteAddr() string
- func (s *BaseSink) SetCreateTime(time time.Time)
- func (s *BaseSink) SetEnableVideo(enable bool)
- func (s *BaseSink) SetID(id SinkID)
- func (s *BaseSink) SetReady(ok bool)
- func (s *BaseSink) SetSentPacketCount(count int)
- func (s *BaseSink) SetState(state SessionState)
- func (s *BaseSink) SetTransStreamID(id TransStreamID)
- func (s *BaseSink) SetUrlValues(values url.Values)
- func (s *BaseSink) StartStreaming(stream TransStream) error
- func (s *BaseSink) StopStreaming(stream TransStream)
- func (s *BaseSink) String() string
- func (s *BaseSink) UnLock()
- func (s *BaseSink) UrlValues() url.Values
- func (s *BaseSink) Write(index int, data [][]byte, ts int64) error
- type BaseTransStream
- func (t *BaseTransStream) AddTrack(track *Track) error
- func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte)
- func (t *BaseTransStream) ClearOutStreamBuffer()
- func (t *BaseTransStream) Close() ([][]byte, int64, error)
- func (t *BaseTransStream) GetID() TransStreamID
- func (t *BaseTransStream) GetProtocol() TransStreamProtocol
- func (t *BaseTransStream) GetTracks() []*Track
- func (t *BaseTransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error)
- func (t *BaseTransStream) IsExistVideo() bool
- func (t *BaseTransStream) OutStreamBufferCapacity() int
- func (t *BaseTransStream) ReadExtraData(timestamp int64) ([][]byte, int64, error)
- func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error)
- func (t *BaseTransStream) SetID(id TransStreamID)
- func (t *BaseTransStream) SetProtocol(protocol TransStreamProtocol)
- func (t *BaseTransStream) TrackSize() int
- type BitrateStatistics
- type EnableConfig
- type GB28181Config
- type GOPBuffer
- type HlsConfig
- func (e *HlsConfig) IsEnable() bool
- func (c HlsConfig) M3U8Dir(sourceId string) string
- func (c HlsConfig) M3U8Format(sourceId string) string
- func (c HlsConfig) M3U8Path(sourceId string) string
- func (e *HlsConfig) SetEnable(b bool)
- func (c HlsConfig) TSFormat(sourceId string) string
- func (c HlsConfig) TSPath(sourceId string, tsSeq string) string
- type HookEvent
- type HooksConfig
- func (e *HooksConfig) IsEnable() bool
- func (hook *HooksConfig) IsEnableOnIdleTimeout() bool
- func (hook *HooksConfig) IsEnableOnPlay() bool
- func (hook *HooksConfig) IsEnableOnPlayDone() bool
- func (hook *HooksConfig) IsEnableOnPublishDone() bool
- func (hook *HooksConfig) IsEnableOnReceiveTimeout() bool
- func (hook *HooksConfig) IsEnableOnRecord() bool
- func (hook *HooksConfig) IsEnableOnStarted() bool
- func (hook *HooksConfig) IsEnablePublishEvent() bool
- func (e *HooksConfig) SetEnable(b bool)
- type HttpConfig
- type IPV4SinkID
- type IPV6SinkID
- type JT1078Config
- type JitterBuffer
- type LogConfig
- type MergeWritingBuffer
- type PortConfig
- type PublishSource
- func (s *PublishSource) AddSink(sink Sink)
- func (s *PublishSource) Close()
- func (s *PublishSource) CorrectTimestamp(packet utils.AVPacket)
- func (s *PublishSource) CreateDefaultOutStreams()
- func (s *PublishSource) CreateTime() time.Time
- func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)
- func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, ...)
- func (s *PublishSource) DispatchGOPBuffer(transStream TransStream)
- func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket)
- func (s *PublishSource) DoClose()
- func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool
- func (s *PublishSource) FindSink(id SinkID) Sink
- func (s *PublishSource) GetBitrateStatistics() *BitrateStatistics
- func (s *PublishSource) GetID() string
- func (s *PublishSource) GetStreamEndInfo() *StreamEndInfo
- func (s *PublishSource) GetTransStreams() map[TransStreamID]TransStream
- func (s *PublishSource) GetType() SourceType
- func (s *PublishSource) Init(receiveQueueSize int)
- func (s *PublishSource) Input(data []byte) error
- func (s *PublishSource) IsClosed() bool
- func (s *PublishSource) IsCompleted() bool
- func (s *PublishSource) LastPacketTime() time.Time
- func (s *PublishSource) LastStreamEndTime() time.Time
- func (s *PublishSource) MainContextEvents() chan func()
- func (s *PublishSource) NotTrackAdded(index int) bool
- func (s *PublishSource) OnDeMuxDone()
- func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket)
- func (s *PublishSource) OnDeMuxStream(stream utils.AVStream)
- func (s *PublishSource) OnDeMuxStreamDone()
- func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket)
- func (s *PublishSource) OriginTracks() []*Track
- func (s *PublishSource) PostEvent(cb func())
- func (s *PublishSource) RemoteAddr() string
- func (s *PublishSource) RemoveSink(sink Sink)
- func (s *PublishSource) RemoveSinkWithID(id SinkID)
- func (s *PublishSource) SetCreateTime(time time.Time)
- func (s *PublishSource) SetID(id string)
- func (s *PublishSource) SetLastPacketTime(time2 time.Time)
- func (s *PublishSource) SetState(state SessionState)
- func (s *PublishSource) SetType(sourceType SourceType)
- func (s *PublishSource) SetUrlValues(values url.Values)
- func (s *PublishSource) SinkCount() int
- func (s *PublishSource) Sinks() []Sink
- func (s *PublishSource) State() SessionState
- func (s *PublishSource) StreamPipe() chan []byte
- func (s *PublishSource) String() string
- func (s *PublishSource) TranscodeTracks() []*Track
- func (s *PublishSource) UrlValues() url.Values
- type ReceiveBuffer
- type RecordConfig
- type RecordStreamFactory
- type RtmpConfig
- type RtspConfig
- type SessionHandler
- type SessionState
- type Sink
- type SinkID
- type Source
- type SourceType
- type StreamEndInfo
- type StreamEndInfoManager
- type StreamServer
- type TCPTransStream
- type Track
- type TrackManager
- func (s *TrackManager) Add(track *Track)
- func (s *TrackManager) All() []*Track
- func (s *TrackManager) Find(id utils.AVCodecID) *Track
- func (s *TrackManager) FindTracks(id utils.AVCodecID) []*Track
- func (s *TrackManager) FindTracksWithType(mediaType utils.AVMediaType) []*Track
- func (s *TrackManager) FindWithType(mediaType utils.AVMediaType) *Track
- type TransStream
- type TransStreamFactory
- type TransStreamID
- type TransStreamProtocol
- type TransportConfig
- type WebRtcConfig
Constants ¶
const ( HookEventPublish = HookEvent(0x1) HookEventPublishDone = HookEvent(0x2) HookEventPlay = HookEvent(0x3) HookEventPlayDone = HookEvent(0x4) HookEventRecord = HookEvent(0x5) HookEventIdleTimeout = HookEvent(0x6) HookEventReceiveTimeout = HookEvent(0x7) HookEventStarted = HookEvent(0x8) )
const ( ReceiveBufferUdpBlockCount = 300 ReceiveBufferTCPBlockCount = 50 )
const ( SourceTypeRtmp = SourceType(1) SourceType28181 = SourceType(2) SourceType1078 = SourceType(3) TransStreamRtmp = TransStreamProtocol(1) TransStreamFlv = TransStreamProtocol(2) TransStreamRtsp = TransStreamProtocol(3) TransStreamHls = TransStreamProtocol(4) TransStreamRtc = TransStreamProtocol(5) TransStreamGBStreamForward = TransStreamProtocol(6) // 国标级联转发 )
const ( SessionStateCreated = SessionState(1) // 新建状态 SessionStateHandshaking = SessionState(2) // 握手中 SessionStateHandshakeFailure = SessionState(3) // 握手失败 SessionStateHandshakeSuccess = SessionState(4) // 握手完成 SessionStateWaiting = SessionState(5) // 位于等待队列中 SessionStateTransferring = SessionState(6) // 推拉流中 SessionStateClosed = SessionState(7) // 关闭状态 )
const (
DefaultMergeWriteLatency = 350
)
Variables ¶
var SinkManager *sinkManager
SinkManager 目前只用于保存HLS拉流Sink
var SourceManager *sourceManger
SourceManager 全局管理所有推流源
var (
StreamEndInfoBride func(s Source) *StreamEndInfo
)
Functions ¶
func AddSinkToWaitingQueue ¶
func DumpStream2File ¶
func DumpStream2File(sourceType SourceType, conn net.Conn, data []byte)
DumpStream2File 保存推流到文件, 用4字节帧长分割
func EqualsTracks ¶
func EqualsTracks(info *StreamEndInfo, tracks []*Track) bool
func ExistSinkInWaitingQueue ¶
func ExtractAudioPacket ¶
func ExtractVideoPacket ¶
func GetStreamPlayUrls ¶
func HookIdleTimeoutEvent ¶
func HookPublishDoneEvent ¶
func HookPublishDoneEvent(source Source)
func HookReceiveTimeoutEvent ¶
func HookRecordEvent ¶
func InitHookUrls ¶
func InitHookUrls()
func IsSupportMux ¶
func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils.AVCodecID) bool
func JoinHostPort ¶
func ListenAddr ¶
func NewHookPlayEventInfo ¶
func NewHookPlayEventInfo(sink Sink) eventInfo
func NewHookPublishEventInfo ¶
func NewHookPublishEventInfo(source Source) eventInfo
func NewRecordEventInfo ¶
func PreparePublishSource ¶
func RegisterTransStreamFactory ¶
func RegisterTransStreamFactory(protocol TransStreamProtocol, streamFunc TransStreamFactory)
func SetDefaultConfig ¶
func SetDefaultConfig(config *AppConfig_)
func SetRecordStreamFactory ¶
func SetRecordStreamFactory(factory RecordStreamFactory)
func SinkId2String ¶
func StartReceiveDataTimer ¶
StartReceiveDataTimer 启动收流超时计时器
Types ¶
type AppConfig_ ¶
type AppConfig_ struct { GOPCache bool `json:"gop_cache"` // 是否开启GOP缓存,只缓存一组音视频 GOPBufferSize int `json:"gop_buffer_size"` // 预估GOPBuffer大小, AVPacket缓存池和合并写缓存池都会参考此大小 ProbeTimeout int `json:"probe_timeout"` // 收流解析AVStream的超时时间 WriteTimeout int `json:"write_timeout"` // Server向TCP拉流Conn发包的超时时间, 超过该时间, 直接主动断开Conn. 客户端重新拉流的成本小于服务器缓存成本. WriteBufferCapacity int `json:"-"` // 发送缓冲区容量大小, 缓冲区由多个内存块构成. PublicIP string `json:"public_ip"` ListenIP string `json:"listen_ip"` IdleTimeout int64 `json:"idle_timeout"` // 多长时间(单位秒)没有拉流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. ReceiveTimeout int64 `json:"receive_timeout"` // 多长时间(单位秒)没有收到流. 如果开启hook通知, 根据hook响应, 决定是否关闭Source(200-不关闭/非200关闭). 否则会直接关闭Source. Debug bool `json:"debug"` // debug模式, 开启将保存推流 //缓存指定时长的包,满了之后才发送给Sink. 可以降低用户态和内核态的交互频率,大幅提升性能. //合并写的大小范围,应当大于一帧的时长,不超过一组GOP的时长,在实际发送流的时候也会遵循此条例. MergeWriteLatency int `json:"mw_latency"` Rtmp RtmpConfig Hls HlsConfig JT1078 JT1078Config Rtsp RtspConfig GB28181 GB28181Config WebRtc WebRtcConfig Hooks HooksConfig Record RecordConfig Log LogConfig Http HttpConfig }
AppConfig_ GOP缓存和合并写必须保持一致,同时开启或关闭. 关闭GOP缓存,是为了降低延迟,很难理解又另外开启合并写.
var AppConfig AppConfig_
func LoadConfigFile ¶
func LoadConfigFile(path string) (*AppConfig_, error)
type BaseSink ¶
type BaseSink struct { ID SinkID SourceID string Protocol TransStreamProtocol State SessionState TransStreamID TransStreamID DesiredAudioCodecId_ utils.AVCodecID DesiredVideoCodecId_ utils.AVCodecID Conn net.Conn // 拉流信令链路 TCPStreaming bool // 是否是TCP流式拉流 SentPacketCount int // 发包计数 Ready bool // 是否准备好推流. Sink可以通过控制该变量, 达到触发Source推流, 但不立即拉流的目的. 比如rtsp拉流端在信令交互阶段,需要先获取媒体信息,再拉流. // contains filtered or unexported fields }
func (*BaseSink) Close ¶
func (s *BaseSink) Close()
Close 做如下事情: 1. Sink如果正在拉流, 删除任务交给Source处理, 否则直接从等待队列删除Sink. 2. 发送PlayDoneHook事件
func (*BaseSink) CreateTime ¶
func (*BaseSink) DesiredAudioCodecId ¶
func (*BaseSink) DesiredVideoCodecId ¶
func (*BaseSink) EnableVideo ¶
func (*BaseSink) GetProtocol ¶
func (s *BaseSink) GetProtocol() TransStreamProtocol
func (*BaseSink) GetSentPacketCount ¶
func (*BaseSink) GetSourceID ¶
func (*BaseSink) GetState ¶
func (s *BaseSink) GetState() SessionState
func (*BaseSink) GetTransStreamID ¶
func (s *BaseSink) GetTransStreamID() TransStreamID
func (*BaseSink) IncreaseSentPacketCount ¶
func (s *BaseSink) IncreaseSentPacketCount()
func (*BaseSink) IsTCPStreaming ¶
func (*BaseSink) RemoteAddr ¶
func (*BaseSink) SetCreateTime ¶
func (*BaseSink) SetEnableVideo ¶
func (*BaseSink) SetSentPacketCount ¶
func (*BaseSink) SetState ¶
func (s *BaseSink) SetState(state SessionState)
func (*BaseSink) SetTransStreamID ¶
func (s *BaseSink) SetTransStreamID(id TransStreamID)
func (*BaseSink) SetUrlValues ¶
func (*BaseSink) StartStreaming ¶
func (s *BaseSink) StartStreaming(stream TransStream) error
func (*BaseSink) StopStreaming ¶
func (s *BaseSink) StopStreaming(stream TransStream)
type BaseTransStream ¶
type BaseTransStream struct { ID TransStreamID Tracks []*Track Completed bool ExistVideo bool Protocol TransStreamProtocol OutBuffer [][]byte // 传输流的合并写块队列 OutBufferSize int // 传输流返合并写块队列大小 }
func (*BaseTransStream) AddTrack ¶
func (t *BaseTransStream) AddTrack(track *Track) error
func (*BaseTransStream) AppendOutStreamBuffer ¶
func (t *BaseTransStream) AppendOutStreamBuffer(buffer []byte)
func (*BaseTransStream) ClearOutStreamBuffer ¶
func (t *BaseTransStream) ClearOutStreamBuffer()
func (*BaseTransStream) GetID ¶
func (t *BaseTransStream) GetID() TransStreamID
func (*BaseTransStream) GetProtocol ¶
func (t *BaseTransStream) GetProtocol() TransStreamProtocol
func (*BaseTransStream) GetTracks ¶
func (t *BaseTransStream) GetTracks() []*Track
func (*BaseTransStream) IsExistVideo ¶
func (t *BaseTransStream) IsExistVideo() bool
func (*BaseTransStream) OutStreamBufferCapacity ¶
func (t *BaseTransStream) OutStreamBufferCapacity() int
func (*BaseTransStream) ReadExtraData ¶
func (t *BaseTransStream) ReadExtraData(timestamp int64) ([][]byte, int64, error)
func (*BaseTransStream) ReadKeyFrameBuffer ¶
func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error)
func (*BaseTransStream) SetID ¶
func (t *BaseTransStream) SetID(id TransStreamID)
func (*BaseTransStream) SetProtocol ¶
func (t *BaseTransStream) SetProtocol(protocol TransStreamProtocol)
func (*BaseTransStream) TrackSize ¶
func (t *BaseTransStream) TrackSize() int
type BitrateStatistics ¶
type BitrateStatistics struct {
// contains filtered or unexported fields
}
BitrateStatistics 码流统计, 单位Byte
func NewBitrateStatistics ¶
func NewBitrateStatistics() *BitrateStatistics
func (*BitrateStatistics) Input ¶
func (b *BitrateStatistics) Input(size int)
func (*BitrateStatistics) PreviousSecond ¶
func (b *BitrateStatistics) PreviousSecond() int
PreviousSecond 返回前一秒的码流大小
type EnableConfig ¶
type GB28181Config ¶
type GB28181Config struct { TransportConfig Port []int `json:"port"` // contains filtered or unexported fields }
func (GB28181Config) IsMultiPort ¶
func (g GB28181Config) IsMultiPort() bool
type GOPBuffer ¶
type GOPBuffer interface { // AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败 AddPacket(packet utils.AVPacket) bool // SetDiscardHandler 设置丢弃帧时的回调 SetDiscardHandler(handler func(packet utils.AVPacket)) PeekAll(handler func(packet utils.AVPacket)) Peek(index int) utils.AVPacket Size() int Clear() Close() }
GOPBuffer GOP缓存
func NewStreamBuffer ¶
func NewStreamBuffer() GOPBuffer
type HlsConfig ¶
type HlsConfig struct { Dir string `json:"dir"` Duration int `json:"segment_duration"` PlaylistLength int `json:"playlist_length"` // contains filtered or unexported fields }
func (HlsConfig) M3U8Format ¶
M3U8Format 根据id返回m3u8文件名
func (HlsConfig) M3U8Path ¶
M3U8Path 根据sourceId返回m3u8的磁盘路径 切片及目录生成规则, 以SourceId为34020000001320000001/34020000001320000001为例: 创建文件夹34020000001320000001, 34020000001320000001.m3u8文件, 文件列表中切片url为34020000001320000001_seq.ts
type HooksConfig ¶
type HooksConfig struct { Timeout int64 `json:"timeout"` OnStartedUrl string `json:"on_started"` //应用启动后回调 OnPublishUrl string `json:"on_publish"` //推流回调 OnPublishDoneUrl string `json:"on_publish_done"` //推流结束回调 OnPlayUrl string `json:"on_play"` //拉流回调 OnPlayDoneUrl string `json:"on_play_done"` //拉流结束回调 OnRecordUrl string `json:"on_record"` //录制流回调 OnIdleTimeoutUrl string `json:"on_idle_timeout"` //没有sink拉流回调 OnReceiveTimeoutUrl string `json:"on_receive_timeout"` //没有推流回调 // contains filtered or unexported fields }
func (*HooksConfig) IsEnableOnIdleTimeout ¶
func (hook *HooksConfig) IsEnableOnIdleTimeout() bool
func (*HooksConfig) IsEnableOnPlay ¶
func (hook *HooksConfig) IsEnableOnPlay() bool
func (*HooksConfig) IsEnableOnPlayDone ¶
func (hook *HooksConfig) IsEnableOnPlayDone() bool
func (*HooksConfig) IsEnableOnPublishDone ¶
func (hook *HooksConfig) IsEnableOnPublishDone() bool
func (*HooksConfig) IsEnableOnReceiveTimeout ¶
func (hook *HooksConfig) IsEnableOnReceiveTimeout() bool
func (*HooksConfig) IsEnableOnRecord ¶
func (hook *HooksConfig) IsEnableOnRecord() bool
func (*HooksConfig) IsEnableOnStarted ¶
func (hook *HooksConfig) IsEnableOnStarted() bool
func (*HooksConfig) IsEnablePublishEvent ¶
func (hook *HooksConfig) IsEnablePublishEvent() bool
type HttpConfig ¶
type HttpConfig struct {
Port int `json:"port"`
}
type IPV4SinkID ¶
type IPV4SinkID uint64
type IPV6SinkID ¶
type IPV6SinkID string
type JT1078Config ¶
type JT1078Config struct {
// contains filtered or unexported fields
}
type JitterBuffer ¶
type JitterBuffer struct {
// contains filtered or unexported fields
}
JitterBuffer 只处理乱序的JitterBuffer
func NewJitterBuffer ¶
func NewJitterBuffer() *JitterBuffer
func (*JitterBuffer) Flush ¶
func (j *JitterBuffer) Flush()
func (*JitterBuffer) Push ¶
func (j *JitterBuffer) Push(seq uint16, packet interface{})
func (*JitterBuffer) SetHandler ¶
func (j *JitterBuffer) SetHandler(handler func(packet interface{}))
type MergeWritingBuffer ¶
type MergeWritingBuffer interface { Allocate(size int, ts int64, videoKey bool) []byte // PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 未满返回nil. PeekCompletedSegment() ([]byte, bool) // FlushSegment 生成并返回当前切片, 以及是否是关键帧切片. FlushSegment() ([]byte, bool) // IsFull 当前切片已满 IsFull(ts int64) bool // IsNewSegment 当前切片是否还未写数据 IsNewSegment() bool // Reserve 从当前切片中预留指定长度数据 Reserve(length int) // ReadSegmentsFromKeyFrameIndex 返回最近的关键帧切片 ReadSegmentsFromKeyFrameIndex(cb func([]byte)) Capacity() int }
MergeWritingBuffer 实现针对RTMP/FLV/HLS等基于TCP传输流的合并写缓存 包含多个合并写块, 循环使用, 至少需要等到第二个I帧才开始循环. webrtcI帧间隔可能会高达几十秒, 容量根据write_timeout发送超时和合并写时间来计算, write_timeout/mw_latency.如果I帧间隔大于发送超时时间, 则需要创建新的块.
func NewMergeWritingBuffer ¶
func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer
type PortConfig ¶
type PublishSource ¶
type PublishSource struct { ID string Type SourceType Conn net.Conn TransDeMuxer stream.DeMuxer // 负责从推流协议中解析出AVStream和AVPacket TransStreams map[TransStreamID]TransStream // 所有的输出流, 持有Sink TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink // contains filtered or unexported fields }
func (*PublishSource) AddSink ¶
func (s *PublishSource) AddSink(sink Sink)
func (*PublishSource) Close ¶
func (s *PublishSource) Close()
func (*PublishSource) CorrectTimestamp ¶
func (s *PublishSource) CorrectTimestamp(packet utils.AVPacket)
func (*PublishSource) CreateDefaultOutStreams ¶
func (s *PublishSource) CreateDefaultOutStreams()
func (*PublishSource) CreateTime ¶
func (s *PublishSource) CreateTime() time.Time
func (*PublishSource) CreateTransStream ¶
func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)
func (*PublishSource) DispatchBuffer ¶
func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data [][]byte, timestamp int64, videoKey bool)
DispatchBuffer 分发传输流
func (*PublishSource) DispatchGOPBuffer ¶
func (s *PublishSource) DispatchGOPBuffer(transStream TransStream)
func (*PublishSource) DispatchPacket ¶
func (s *PublishSource) DispatchPacket(transStream TransStream, packet utils.AVPacket)
DispatchPacket 分发AVPacket
func (*PublishSource) DoClose ¶
func (s *PublishSource) DoClose()
func (*PublishSource) FindOrCreatePacketBuffer ¶
func (s *PublishSource) FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool
FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池
func (*PublishSource) FindSink ¶
func (s *PublishSource) FindSink(id SinkID) Sink
func (*PublishSource) GetBitrateStatistics ¶
func (s *PublishSource) GetBitrateStatistics() *BitrateStatistics
func (*PublishSource) GetID ¶
func (s *PublishSource) GetID() string
func (*PublishSource) GetStreamEndInfo ¶
func (s *PublishSource) GetStreamEndInfo() *StreamEndInfo
func (*PublishSource) GetTransStreams ¶
func (s *PublishSource) GetTransStreams() map[TransStreamID]TransStream
func (*PublishSource) GetType ¶
func (s *PublishSource) GetType() SourceType
func (*PublishSource) Init ¶
func (s *PublishSource) Init(receiveQueueSize int)
func (*PublishSource) Input ¶
func (s *PublishSource) Input(data []byte) error
func (*PublishSource) IsClosed ¶
func (s *PublishSource) IsClosed() bool
func (*PublishSource) IsCompleted ¶
func (s *PublishSource) IsCompleted() bool
func (*PublishSource) LastPacketTime ¶
func (s *PublishSource) LastPacketTime() time.Time
func (*PublishSource) LastStreamEndTime ¶
func (s *PublishSource) LastStreamEndTime() time.Time
func (*PublishSource) MainContextEvents ¶
func (s *PublishSource) MainContextEvents() chan func()
func (*PublishSource) NotTrackAdded ¶
func (s *PublishSource) NotTrackAdded(index int) bool
NotTrackAdded 返回该index对应的track是否没有添加
func (*PublishSource) OnDeMuxDone ¶
func (s *PublishSource) OnDeMuxDone()
func (*PublishSource) OnDeMuxPacket ¶
func (s *PublishSource) OnDeMuxPacket(packet utils.AVPacket)
func (*PublishSource) OnDeMuxStream ¶
func (s *PublishSource) OnDeMuxStream(stream utils.AVStream)
func (*PublishSource) OnDeMuxStreamDone ¶
func (s *PublishSource) OnDeMuxStreamDone()
func (*PublishSource) OnDiscardPacket ¶
func (s *PublishSource) OnDiscardPacket(packet utils.AVPacket)
OnDiscardPacket GOP缓存溢出丢弃回调
func (*PublishSource) OriginTracks ¶
func (s *PublishSource) OriginTracks() []*Track
func (*PublishSource) PostEvent ¶
func (s *PublishSource) PostEvent(cb func())
func (*PublishSource) RemoteAddr ¶
func (s *PublishSource) RemoteAddr() string
func (*PublishSource) RemoveSink ¶
func (s *PublishSource) RemoveSink(sink Sink)
func (*PublishSource) RemoveSinkWithID ¶
func (s *PublishSource) RemoveSinkWithID(id SinkID)
func (*PublishSource) SetCreateTime ¶
func (s *PublishSource) SetCreateTime(time time.Time)
func (*PublishSource) SetID ¶
func (s *PublishSource) SetID(id string)
func (*PublishSource) SetLastPacketTime ¶
func (s *PublishSource) SetLastPacketTime(time2 time.Time)
func (*PublishSource) SetState ¶
func (s *PublishSource) SetState(state SessionState)
func (*PublishSource) SetType ¶
func (s *PublishSource) SetType(sourceType SourceType)
func (*PublishSource) SetUrlValues ¶
func (s *PublishSource) SetUrlValues(values url.Values)
func (*PublishSource) SinkCount ¶
func (s *PublishSource) SinkCount() int
func (*PublishSource) Sinks ¶
func (s *PublishSource) Sinks() []Sink
func (*PublishSource) State ¶
func (s *PublishSource) State() SessionState
func (*PublishSource) StreamPipe ¶
func (s *PublishSource) StreamPipe() chan []byte
func (*PublishSource) String ¶
func (s *PublishSource) String() string
func (*PublishSource) TranscodeTracks ¶
func (s *PublishSource) TranscodeTracks() []*Track
func (*PublishSource) UrlValues ¶
func (s *PublishSource) UrlValues() url.Values
type ReceiveBuffer ¶
type ReceiveBuffer struct {
// contains filtered or unexported fields
}
ReceiveBuffer 收流缓冲区. 网络收流->解析流->封装流->发送流是同步的,从解析到发送可能耗时,从而影响读取网络流. 使用收流缓冲区,可有效降低出现此问题的概率. 从网络IO读取数据->送给解复用器, 此过程需做到无内存拷贝 rtmp和1078推流直接使用ReceiveBuffer 国标推流,UDP收流都要经过jitter buffer处理, 还是需要拷贝一次, 没必要使用ReceiveBuffer. TCP全都使用ReceiveBuffer, 区别在于多端口模式, 第一包传给source, 单端口模式先解析出ssrc, 找到source. 后续再传给source.
func NewReceiveBuffer ¶
func NewReceiveBuffer(blockSize, blockCount int) *ReceiveBuffer
func NewTCPReceiveBuffer ¶
func NewTCPReceiveBuffer() *ReceiveBuffer
func NewUDPReceiveBuffer ¶
func NewUDPReceiveBuffer() *ReceiveBuffer
func (*ReceiveBuffer) BlockCount ¶
func (r *ReceiveBuffer) BlockCount() int
func (*ReceiveBuffer) Get ¶
func (r *ReceiveBuffer) Get(index int) []byte
func (*ReceiveBuffer) GetBlock ¶
func (r *ReceiveBuffer) GetBlock() []byte
func (*ReceiveBuffer) Index ¶
func (r *ReceiveBuffer) Index() int
type RecordConfig ¶
type RtmpConfig ¶
type RtmpConfig struct {
// contains filtered or unexported fields
}
type RtspConfig ¶
type RtspConfig struct { TransportConfig Port []int `json:"port"` Password string `json:"password"` // contains filtered or unexported fields }
func (RtspConfig) IsMultiPort ¶
func (g RtspConfig) IsMultiPort() bool
type SessionHandler ¶
type SessionState ¶
type SessionState uint32
SessionState 推拉流Session的状态 包含握手和Hook授权阶段
func (SessionState) String ¶
func (s SessionState) String() string
type Sink ¶
type Sink interface { GetID() SinkID SetID(sink SinkID) GetSourceID() string Write(index int, data [][]byte, ts int64) error GetTransStreamID() TransStreamID SetTransStreamID(id TransStreamID) GetProtocol() TransStreamProtocol // GetState 获取Sink状态, 调用前外部必须手动加锁 GetState() SessionState // SetState 设置Sink状态, 调用前外部必须手动加锁 SetState(state SessionState) EnableVideo() bool // SetEnableVideo 设置是否拉取视频流, 允许客户端只拉取音频流 SetEnableVideo(enable bool) // DesiredAudioCodecId 允许客户端拉取指定的音频流 DesiredAudioCodecId() utils.AVCodecID // DesiredVideoCodecId DescribeVideoCodecId 允许客户端拉取指定的视频流 DesiredVideoCodecId() utils.AVCodecID // Close 关闭释放Sink, 从传输流或等待队列中删除sink Close() String() string RemoteAddr() string // Lock Sink请求拉流->Source推流->Sink断开整个阶段, 是无锁线程安全 // 如果Sink在等待队列-Sink断开, 这个过程是非线程安全的 // 所以Source在AddSink时, SessionStateWait状态时, 需要加锁保护. Lock() UnLock() UrlValues() url.Values SetUrlValues(values url.Values) // StartStreaming Source向Sink开始推流时调用 StartStreaming(stream TransStream) error // StopStreaming Source向Sink停止推流时调用 StopStreaming(stream TransStream) GetConn() net.Conn IsTCPStreaming() bool GetSentPacketCount() int SetSentPacketCount(int) IncreaseSentPacketCount() IsReady() bool SetReady(ok bool) CreateTime() time.Time SetCreateTime(time time.Time) }
Sink 对拉流端的封装
func PopWaitingSinks ¶
type SinkID ¶
type SinkID interface{}
SinkID IPV4使用uint64、IPV6使用string作为ID类型
func NetAddr2SinkId ¶
NetAddr2SinkId 根据网络地址生成SinkId IPV4使用一个uint64, IPV6使用String
type Source ¶
type Source interface { // GetID 返回SourceID GetID() string SetID(id string) // Input 输入推流数据 Input(data []byte) error // GetType 返回推流类型 GetType() SourceType SetType(sourceType SourceType) // OriginTracks 返回所有的推流track OriginTracks() []*Track // TranscodeTracks 返回所有的转码track TranscodeTracks() []*Track // AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader,先将Sink添加到等待队列. // 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink AddSink(sink Sink) // RemoveSink 删除Sink RemoveSink(sink Sink) RemoveSinkWithID(id SinkID) FindSink(id SinkID) Sink SetState(state SessionState) // Close 关闭Source // 关闭推流网络链路, 停止一切封装和转发流以及转码工作 // 将Sink添加到等待队列 Close() DoClose() // IsCompleted 所有推流track是否解析完毕 IsCompleted() bool // FindOrCreatePacketBuffer 查找或者创建AVPacket的内存池 FindOrCreatePacketBuffer(index int, mediaType utils.AVMediaType) collections.MemoryPool // OnDiscardPacket GOP缓存溢出回调, 释放AVPacket OnDiscardPacket(pkt utils.AVPacket) // OnDeMuxStream 解析出AVStream回调 OnDeMuxStream(stream utils.AVStream) // OnDeMuxStreamDone 所有track解析完毕回调, 后续的OnDeMuxStream回调不再处理 OnDeMuxStreamDone() // OnDeMuxPacket 解析出AvPacket回调 OnDeMuxPacket(packet utils.AVPacket) // OnDeMuxDone 所有流解析完毕回调 OnDeMuxDone() Init(receiveQueueSize int) RemoteAddr() string String() string State() SessionState // UrlValues 返回推流url参数 UrlValues() url.Values // SetUrlValues 设置推流url参数 SetUrlValues(values url.Values) // PostEvent 切换到主协程执行当前函数 PostEvent(cb func()) // LastPacketTime 返回最近收流时间戳 LastPacketTime() time.Time SetLastPacketTime(time2 time.Time) // SinkCount 返回拉流计数 SinkCount() int // LastStreamEndTime 返回最近结束拉流时间戳 LastStreamEndTime() time.Time IsClosed() bool StreamPipe() chan []byte MainContextEvents() chan func() CreateTime() time.Time SetCreateTime(time time.Time) Sinks() []Sink GetBitrateStatistics() *BitrateStatistics GetTransStreams() map[TransStreamID]TransStream GetStreamEndInfo() *StreamEndInfo }
Source 对推流源的封装
type SourceType ¶
type SourceType byte
SourceType 推流类型
func (SourceType) String ¶
func (s SourceType) String() string
type StreamEndInfo ¶
type StreamEndInfo struct { ID string Timestamps map[utils.AVCodecID][2]int64 // 每路track结束时间戳 M3U8Writer libhls.M3U8Writer // 保存M3U8生成器 PlaylistFormat *string // M3U8播放列表 RtspTracks map[byte]uint16 // rtsp每路track的结束序号 FLVPrevTagSize uint32 // flv的最后一个tag大小, 下次生成flv时作为prev tag size }
StreamEndInfo 保留结束推流Source的推流信息 在结束推流时,如果还有拉流端没有断开,则保留一些推流信息(目前有时间戳、ts切片序号等等)。在下次推流时,使用该时间戳作为新传输流的起始时间戳,保证拉流端在拉流时不会重现pts和dts错误. 如果在此之前,陆续有拉流端断开,直至sink计数为0,则会不再保留该信息。
type StreamEndInfoManager ¶
type StreamEndInfoManager struct {
// contains filtered or unexported fields
}
func (*StreamEndInfoManager) Add ¶
func (s *StreamEndInfoManager) Add(history *StreamEndInfo)
func (*StreamEndInfoManager) Remove ¶
func (s *StreamEndInfoManager) Remove(id string) *StreamEndInfo
type StreamServer ¶
type StreamServer[T any] struct { SourceType SourceType Handler SessionHandler[T] }
func (*StreamServer[T]) OnConnected ¶
func (s *StreamServer[T]) OnConnected(conn net.Conn) []byte
func (*StreamServer[T]) OnDisConnected ¶
func (s *StreamServer[T]) OnDisConnected(conn net.Conn, err error)
type TCPTransStream ¶
type TCPTransStream struct { BaseTransStream // 合并写内存泄露问题: 推流结束后, mwBuffer的data一直释放不掉, 只有拉流全部断开之后, 才会释放该内存. // 起初怀疑是代码层哪儿有问题, 但是测试发现如果将合并写切片再拷贝一次发送 给sink, 推流结束后,mwBuffer的data内存块释放没问题, 只有拷贝的内存块未释放. 所以排除了代码层造成内存泄露的可能性. // 看来是conn在write后还会持有data. 查阅代码发现, 的确如此. 向fd发送数据前buffer会引用data, 但是后续没有赋值为nil, 取消引用. https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/internal/poll/fd_windows.go#L694 MWBuffer MergeWritingBuffer //合并写缓冲区, 同时作为用户态的发送缓冲区 }
func (*TCPTransStream) OutStreamBufferCapacity ¶
func (t *TCPTransStream) OutStreamBufferCapacity() int
type Track ¶
type TrackManager ¶
type TrackManager struct {
// contains filtered or unexported fields
}
func (*TrackManager) Add ¶
func (s *TrackManager) Add(track *Track)
func (*TrackManager) All ¶
func (s *TrackManager) All() []*Track
func (*TrackManager) FindTracks ¶
func (s *TrackManager) FindTracks(id utils.AVCodecID) []*Track
func (*TrackManager) FindTracksWithType ¶
func (s *TrackManager) FindTracksWithType(mediaType utils.AVMediaType) []*Track
func (*TrackManager) FindWithType ¶
func (s *TrackManager) FindWithType(mediaType utils.AVMediaType) *Track
type TransStream ¶
type TransStream interface { GetID() TransStreamID SetID(id TransStreamID) // Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧 Input(packet utils.AVPacket) ([][]byte, int64, bool, error) AddTrack(track *Track) error TrackSize() int GetTracks() []*Track // WriteHeader track添加完毕, 通过调用此函数告知 WriteHeader() error // GetProtocol 返回输出流协议 GetProtocol() TransStreamProtocol SetProtocol(protocol TransStreamProtocol) // ReadExtraData 读取传输流的编码器扩展数据 ReadExtraData(timestamp int64) ([][]byte, int64, error) // ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列 ReadKeyFrameBuffer() ([][]byte, int64, error) // Close 关闭传输流, 返回还未flush的合并写块 Close() ([][]byte, int64, error) // ClearOutStreamBuffer 清空传输流的合并写块队列 ClearOutStreamBuffer() // AppendOutStreamBuffer 添加合并写块到队列 AppendOutStreamBuffer(buffer []byte) // OutStreamBufferCapacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小; OutStreamBufferCapacity() int IsExistVideo() bool }
TransStream 将AVPacket封装成传输流
func CreateTransStream ¶
func CreateTransStream(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)
type TransStreamFactory ¶
type TransStreamFactory func(source Source, protocol TransStreamProtocol, tracks []*Track) (TransStream, error)
func FindTransStreamFactory ¶
func FindTransStreamFactory(protocol TransStreamProtocol) (TransStreamFactory, error)
type TransStreamID ¶
type TransStreamID uint64
TransStreamID 每个传输流的唯一Id,根据输出流协议ID+流包含的音视频编码器ID生成 输出流协议ID占用高8位 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流.
func GenerateTransStreamID ¶
func GenerateTransStreamID(protocol TransStreamProtocol, tracks ...*Track) TransStreamID
GenerateTransStreamID 根据输出流协议和输出流包含的音视频编码器ID生成流ID
type TransStreamProtocol ¶
type TransStreamProtocol uint32
TransStreamProtocol 输出的流协议
func (TransStreamProtocol) String ¶
func (p TransStreamProtocol) String() string
type TransportConfig ¶
type TransportConfig struct {
Transport string `json:"transport"` //"UDP|TCP"
}
func (TransportConfig) IsEnableTCP ¶
func (g TransportConfig) IsEnableTCP() bool
func (TransportConfig) IsEnableUDP ¶
func (g TransportConfig) IsEnableUDP() bool
type WebRtcConfig ¶
type WebRtcConfig struct { TransportConfig // contains filtered or unexported fields }
Source Files ¶
- bitrate_statistics.go
- config.go
- gop_buffer.go
- hook.go
- hook_event.go
- hook_sink.go
- hook_source.go
- jitter_buffer.go
- mw_buffer.go
- receive_buffer.go
- sink.go
- sink_manager.go
- sink_utils.go
- sink_waitting.go
- source.go
- source_manager.go
- source_utils.go
- stream_endinfo.go
- stream_factory.go
- stream_server.go
- track.go
- track_manager.go
- trans_stream.go
- trans_utils.go