Documentation ¶
Index ¶
- Constants
- Variables
- func CalcM3u8Duration(content []byte) (durationSec float64, err error)
- func GetFsHandler() filesystemlayer.IFileSystemLayer
- func ReadFile(filename string) ([]byte, error)
- func RemoveAll(path string) error
- func SetUseMemoryAsDiskFlag(flag bool)
- func SplitFragment2TsPackets(content []byte) (ret [][]byte, err error)
- type CustomCleanUpCallback
- type DefaultPathStrategy
- func (*DefaultPathStrategy) GetLiveM3u8FileName(outPath string, streamName string) string
- func (*DefaultPathStrategy) GetMuxerOutPath(rootOutPath string, streamName string) string
- func (*DefaultPathStrategy) GetRecordM3u8FileName(outPath string, streamName string) string
- func (dps *DefaultPathStrategy) GetRequestInfo(uri string, rootOutPath string) (ri RequestInfo)
- func (*DefaultPathStrategy) GetTsFileName(streamName string, index int, timestamp int) string
- func (*DefaultPathStrategy) GetTsFileNameWithPath(outPath string, fileName string) string
- type Fragment
- type IPathRequestStrategy
- type IPathStrategy
- type IPathWriteStrategy
- type IQueueObserver
- type Muxer
- func (m *Muxer) DisableStream(name string)
- func (m *Muxer) Dispose()
- func (m *Muxer) EnableStream(name string)
- func (m *Muxer) FeedRtmpMessage(msg base.RtmpMsg)
- func (m *Muxer) GetStreamName() string
- func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame)
- func (m *Muxer) OnPatPmt(b []byte)
- func (m *Muxer) OutPath() string
- func (m *Muxer) Start()
- func (m *Muxer) WriteRecordPlaylistWithFile(fileName, backupFileName string, isLast bool)
- type MuxerConfig
- type MuxerEventObserver
- type MuxerObserver
- type Queue
- type ReadFileFallback
- type RequestInfo
- type ServerHandler
- type Streamer
- func (s *Streamer) AudioCacheEmpty() bool
- func (s *Streamer) AudioSeqHeaderCached() bool
- func (s *Streamer) FeedRtmpMessage(msg base.RtmpMsg)
- func (s *Streamer) FlushAudio()
- func (s *Streamer) OnPatPmt(b []byte)
- func (s *Streamer) OnPop(msg base.RtmpMsg)
- func (s *Streamer) VideoSeqHeaderCached() bool
- type StreamerObserver
Constants ¶
const ( CleanupModeNever = 0 CleanupModeInTheEnd = 1 CleanupModeAsap = 2 // 自定义清理过程 CleanupCustomHandler = 99 )
Variables ¶
var ErrHls = errors.New("lal.hls: fxxk")
Functions ¶
func CalcM3u8Duration ¶ added in v0.24.1
content 传入m3u8文件内容
@return durationSec m3u8中所有ts的时间总和。注意,使用的是m3u8文件中描述的ts时间,而不是读取ts文件中实际音视频数据的时间。
func GetFsHandler ¶ added in v0.24.5
func GetFsHandler() filesystemlayer.IFileSystemLayer
func SetUseMemoryAsDiskFlag ¶ added in v0.19.12
func SetUseMemoryAsDiskFlag(flag bool)
func SplitFragment2TsPackets ¶ added in v0.24.1
Types ¶
type CustomCleanUpCallback ¶ added in v0.24.1
type DefaultPathStrategy ¶ added in v0.24.1
type DefaultPathStrategy struct { }
默认的路由,落盘策略
每个流在<rootPath>下以流名称生成一个子目录,目录下包含:
- playlist.m3u8 实时的HLS文件,定期刷新,写入当前最新的TS文件列表,淘汰过期的TS文件列表 - record.m3u8 录制回放的HLS文件,包含了从流开始至今的所有TS文件 - test110-1620540712084-0.ts TS分片文件,命名格式为{liveid}-{timestamp}-{index}.ts - test110-1620540716095-1.ts - ... 一系列的TS文件
假设 流名称="test110" rootPath="/tmp/lal/hls/"
则 http://127.0.0.1:8080/hls/test110/playlist.m3u8 -> /tmp/lal/hls/test110/playlist.m3u8 http://127.0.0.1:8080/hls/test110/record.m3u8 -> /tmp/lal/hls/test110/record.m3u8 http://127.0.0.1:8080/hls/test110/test110-1620540712084-0.ts -> /tmp/lal/hls/test110/test110-1620540712084-0.ts
http://127.0.0.1:8080/hls/test110.m3u8 -> /tmp/lal/hls/test110/playlist.m3u8 http://127.0.0.1:8080/hls/test110-1620540712084-0.ts -> /tmp/lal/hls/test110/test110-1620540712084-0.ts 最下面这两个做了特殊映射
func (*DefaultPathStrategy) GetLiveM3u8FileName ¶ added in v0.24.1
func (*DefaultPathStrategy) GetLiveM3u8FileName(outPath string, streamName string) string
func (*DefaultPathStrategy) GetMuxerOutPath ¶ added in v0.24.1
func (*DefaultPathStrategy) GetMuxerOutPath(rootOutPath string, streamName string) string
<rootOutPath>/<streamName>
func (*DefaultPathStrategy) GetRecordM3u8FileName ¶ added in v0.24.1
func (*DefaultPathStrategy) GetRecordM3u8FileName(outPath string, streamName string) string
func (*DefaultPathStrategy) GetRequestInfo ¶ added in v0.24.1
func (dps *DefaultPathStrategy) GetRequestInfo(uri string, rootOutPath string) (ri RequestInfo)
RequestURI example: uri -> FileName StreamName FileType FileNameWithPath /hls/test110.m3u8 -> test110.m3u8 test110 m3u8 {rootOutPath}/test110/playlist.m3u8 /hls/test110/playlist.m3u8 -> playlist.m3u8 test110 m3u8 {rootOutPath}/test110/playlist.m3u8 /hls/test110/record.m3u8 -> record.m3u8 test110 m3u8 {rootOutPath}/test110/record.m3u8 /hls/test110/test110-1620540712084-.ts -> test110-1620540712084-.ts test110 ts {rootOutPath/test110/test110-1620540712084-.ts /hls/test110-1620540712084-.ts -> test110-1620540712084-.ts test110 ts {rootOutPath/test110/test110-1620540712084-.ts
func (*DefaultPathStrategy) GetTsFileName ¶ added in v0.24.1
func (*DefaultPathStrategy) GetTsFileName(streamName string, index int, timestamp int) string
func (*DefaultPathStrategy) GetTsFileNameWithPath ¶ added in v0.24.1
func (*DefaultPathStrategy) GetTsFileNameWithPath(outPath string, fileName string) string
type IPathRequestStrategy ¶ added in v0.24.1
type IPathRequestStrategy interface { // 解析HTTP请求,得到文件名、文件类型、流名称、文件所在路径 GetRequestInfo(uri string, rootOutPath string) RequestInfo }
路由策略 接到HTTP请求时,对应文件路径的映射逻辑
type IPathStrategy ¶ added in v0.24.1
type IPathStrategy interface { IPathRequestStrategy IPathWriteStrategy }
var (
PathStrategy IPathStrategy = &DefaultPathStrategy{}
)
type IPathWriteStrategy ¶ added in v0.24.1
type IPathWriteStrategy interface { // 获取单个流对应的文件根路径 GetMuxerOutPath(rootOutPath string, streamName string) string // 获取单个流对应的m3u8文件路径 // // @param outPath: func GetMuxerOutPath的结果 GetLiveM3u8FileName(outPath string, streamName string) string // 获取单个流对应的record类型的m3u8文件路径 // // live m3u8和record m3u8的区别: // live记录的是当前最近的可播放内容,record记录的是从流开始时的可播放内容 // // @param outPath: func GetMuxerOutPath的结果 GetRecordM3u8FileName(outPath string, streamName string) string // 获取单个流对应的ts文件路径 // // @param outPath: func GetMuxerOutPath的结果 GetTsFileNameWithPath(outPath string, fileName string) string // ts文件名的生成策略 GetTsFileName(streamName string, index int, timestamp int) string }
落盘策略
type IQueueObserver ¶ added in v0.24.1
type Muxer ¶
type Muxer struct { UniqueKey string // contains filtered or unexported fields }
输入rtmp流,转出hls(m3u8+ts)至文件中,并回调给上层转出ts流
func NewMuxer ¶
func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer MuxerObserver, eventObserver MuxerEventObserver) *Muxer
NewMuxer @param enable 如果false,说明hls功能没开,也即不写文件,但是MuxerObserver依然会回调 @param observer 可以为nil,如果不为nil,TS流将回调给上层 @param eventObserver 可以为nil,主要用于触发新增 frag 和关闭 frag 事件给外部
func (*Muxer) DisableStream ¶ added in v0.24.20
func (*Muxer) EnableStream ¶ added in v0.24.20
func (*Muxer) FeedRtmpMessage ¶ added in v0.24.1
@param msg 函数调用结束后,内部不持有msg中的内存块
func (*Muxer) GetStreamName ¶ added in v0.24.11
func (*Muxer) WriteRecordPlaylistWithFile ¶ added in v0.24.19
type MuxerConfig ¶
type MuxerConfig struct { OutPath string `json:"out_path"` // m3u8和ts文件的输出根目录,注意,末尾需以'/'结束 FragmentDurationMs int `json:"fragment_duration_ms"` FragmentNum int `json:"fragment_num"` // hls文件清理模式: // 0 不删除m3u8+ts文件,可用于录制等场景 // 1 在输入流结束后删除m3u8+ts文件 // 注意,确切的删除时间是推流结束后的<fragment_duration_ms> * <fragment_num> * 2的时间点 // 推迟一小段时间删除,是为了避免输入流刚结束,hls的拉流端还没有拉取完 // 2 推流过程中,持续删除过期的ts文件,只保留最近的<fragment_num> * 2个左右的ts文件 // TODO chef: lalserver的模式1的逻辑是在上层做的,应该重构到hls模块中 CleanupMode int `json:"cleanup_mode"` CleanupCustomCallback CustomCleanUpCallback `json:"-"` }
type MuxerEventObserver ¶ added in v0.19.3
type MuxerEventObserver interface { // OnOpenFragment // @param ts 新建立fragment时的时间戳,毫秒 * 90 // @param id fragment的自增序号 // @param discont 不连续标志,会在m3u8文件的fragment前增加`#EXT-X-DISCONTINUITY` // @pram fileName fragment 文件名 OnOpenFragment(m *Muxer, rightNow time.Time, ts uint64, id int, discont bool, fileName string) // OnCloseFragment // @param id fragment的自增序号 // @param duration 当前fragment中数据的时长,单位秒 OnCloseFragment(m *Muxer, id int, duration float64, isLast bool) }
type MuxerObserver ¶
type Queue ¶ added in v0.24.1
type Queue struct {
// contains filtered or unexported fields
}
缓存流起始的一些数据,判断流中是否存在音频、视频,以及编码格式 一旦判断结束,该队列变成直进直出,不再有实际缓存
func NewQueue ¶ added in v0.24.1
func NewQueue(maxMsgSize int, observer IQueueObserver) *Queue
@param maxMsgSize 最大缓存多少个包
type ReadFileFallback ¶ added in v0.19.9
type RequestInfo ¶ added in v0.24.1
type ServerHandler ¶ added in v0.24.1
type ServerHandler struct {
// contains filtered or unexported fields
}
func NewServerHandler ¶ added in v0.24.1
func NewServerHandler(outPath string) *ServerHandler
func (*ServerHandler) ServeHTTP ¶ added in v0.24.1
func (s *ServerHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
func (*ServerHandler) SetReadFileFallback ¶ added in v0.24.1
func (s *ServerHandler) SetReadFileFallback(fallback ReadFileFallback)
type Streamer ¶
type Streamer struct { UniqueKey string // contains filtered or unexported fields }
输入rtmp流,回调转封装成Annexb格式的流
func NewStreamer ¶
func NewStreamer(observer StreamerObserver) *Streamer
func (*Streamer) AudioCacheEmpty ¶
func (*Streamer) AudioSeqHeaderCached ¶
func (*Streamer) FeedRtmpMessage ¶ added in v0.24.1
@param msg msg.Payload 调用结束后,函数内部不会持有这块内存
TODO chef: 可以考虑数据有问题时,返回给上层,直接主动关闭输入流的连接
func (*Streamer) FlushAudio ¶
func (s *Streamer) FlushAudio()
吐出音频数据的三种情况: 1. 收到音频或视频时,音频缓存队列已达到一定长度 2. 打开一个新的TS文件切片时 3. 输入流关闭时
func (*Streamer) VideoSeqHeaderCached ¶
type StreamerObserver ¶
type StreamerObserver interface { // @param b const只读内存块,上层可以持有,但是不允许修改 OnPatPmt(b []byte) // @param streamer: 供上层获取streamer内部的一些状态,比如spspps是否已缓存,音频缓存队列是否有数据等 // // @param frame: 各字段含义见mpegts.Frame结构体定义 // frame.CC 注意,回调结束后,Streamer会保存frame.CC,上层在TS打包完成后,可通过frame.CC将cc值传递给Streamer // frame.Raw 回调结束后,这块内存可能会被内部重复使用 // OnFrame(streamer *Streamer, frame *mpegts.Frame) }