README
¶
m7s v4核心引擎
该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。包含了一个插件的引入机制,其他功能均由插件实现
引擎的基本功能
- 提供插件机制,对插件的启动,配置解析,事件派发等进行统一管理
- 提供H264、H265、AAC、G711格式的转发
- 提供可复用的AVCC格式、RTP格式、AnnexB格式、ADTS格式等预封装机制
- 提供多Track机制,支持大小流,加密流扩展
- 提供DataTrack机制,可用于实现房间文字聊天等功能
- 提供时间戳同步机制,限速机制
- 提供RTP包乱序重排机制
- 提供订阅者追帧跳帧机制
- 提供发布订阅对外推拉的基础架构
- 提供鉴权机制的底层架构支持
- 提供内存复用机制
- 提供发布者断线重连机制
- 提供按需拉流机制
- 提供HTTP服务端口公用机制
- 提供HTTP API接口自动注册机制
- 提供HTTP接口中间件机制
- 提供结构化日志
- 提供流信息统计和输出
- 提供事件总线机制,可以对所有插件广播事件
- 提供配置热更新机制
引擎自带HTTP接口
- 获取某一个流的详情
/api/stream?streamPath=xxx
- 终止某一个流
/api/closestream?streamPath=xxx
- 获取engine信息
/api/sysInfo
返回值{Version:xxx,StartTime:xxx,IP:[xxx.xxx.xxx.xxx]} - 获取系统基本情况
/api/summary
返回值Summary数据 - 获取所有插件信息
/api/plugins
返回值Plugin数据 - 读取mp4文件再次发布为视频流
/api/replay/mp4?streamPath=xxx&dump=filepath
filepath是文件路径 - 读取ts文件再次发布为视频流
/api/replay/ts?streamPath=xxx&dump=filepath
filepath是文件路径 - 获取指定的配置信息
/api/getconfig?name=xxx
返回xxx插件的配置信息,如果不带参数或参数为空则返回全局配置 - 修改并保存配置信息
/api/modifyconfig?name=xxx&yaml=1
修改xxx插件的配置信息,在请求的body中传入修改后的配置yaml字符串 - 热更新配置信息
/api/updateconfig?name=xxx
热更新xxx插件的配置信息,如果不带参数或参数为空则热更新全局配置 - 获取所有远端拉流信息
/api/list/pull
返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""} - 获取所有向远端推流信息
/api/list/push
返回{RemoteURL:"",StreamPath:"",Type:"",StartTime:""} - 停止推流
/api/stoppush?url=xxx
停止向xxx推流 ,成功返回ok
引擎默认配置
global:
disableall: false # 是否禁用所有插件
loglang: zh # 日志语言,可选值:zh,en
loglevel: info # 日志级别,可选值:debug,info,warn,error,panic,fatal
http:
listenaddr: :8080 # 网关地址,用于访问API
listenaddrtls: "" # 用于HTTPS方式访问API的端口配置
certfile: ""
keyfile: ""
cors: true # 是否自动添加cors头
username: "" # 用户名和密码,用于API访问时的基本身份认证
password: ""
readtimeout: 0 # 读取超时时间,0为不限制
writetimeout: 0 # 写入超时时间,0为不限制
idletimeout: 0 # 空闲超时时间,0为不限制
publish:
pubaudio: true # 是否发布音频流
pubvideo: true # 是否发布视频流
kickexist: false # 剔出已经存在的发布者,用于顶替原有发布者
publishtimeout: 10s # 发布流默认过期时间,超过该时间发布者没有恢复流将被删除
delayclosetimeout: 0 # 自动关闭触发后延迟的时间(期间内如果有新的订阅则取消触发关闭),0为关闭该功能,保持连接。
waitclosetimeout: 0 # 发布者断开后等待时间,超过该时间发布者没有恢复流将被删除,0为关闭该功能,由订阅者决定是否删除
buffertime: 0 # 缓存时间,用于时光回溯,0为关闭缓存
idletimeout: 0 # 空闲超时时间,0为不限制
poll: 20ms # 订阅者轮询时间,伪自选锁等待周期
key: # 发布鉴权key
secretargname: secret # 发布鉴权参数名
expireargname: expire # 发布鉴权失效时间参数名
subscribe:
subaudio: true # 是否订阅音频流
subvideo: true # 是否订阅视频流
subaudioargname: ats # 订阅音频轨道参数名
subvideoargname: vts # 订阅视频轨道参数名
subdataargname: dts # 订阅数据轨道参数名
subaudiotracks: [] # 订阅音频轨道名称列表
subvideotracks: [] # 订阅视频轨道名称列表
submode: 0 # 订阅模式,0为跳帧追赶模式,1为不追赶(多用于录制),2为时光回溯模式
iframeonly: false # 只订阅关键帧
waittimeout: 10s # 等待发布者的超时时间,用于订阅尚未发布的流
writebuffersize: 0 # 订阅者写缓存大小,用于减少io次数,但可能影响实时性
poll: 20ms # 订阅者轮询时间,伪自选锁等待周期
key: # 订阅鉴权key
secretargname: secret # 订阅鉴权参数名
expireargname: expire # 订阅鉴权失效时间参数名
internal: false # 是否内部订阅,内部订阅不会触发发布者自动断开功能
enableavcc : true # 启用AVCC格式缓存,用于rtmp协议
enablertp : true # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议
enableauth: true # 启用鉴权,详细查看鉴权机制
enablesubevent: true # 启用订阅事件,用于订阅者上下线事件,关闭可以提高性能
rtpreroderbufferlen: 50 # rtp乱序重排缓存长度
speedlimit: 500ms # 限速超时时间 0为不限速,对于读取文件这类流需要限速,否则读取过快
eventbussize: 10 # 事件总线缓存大小,事件较多时容易堵阻塞线程,需要增大缓存
pulseinterval: 5s # 心跳事件间隔时间
console:
server : console.monibuca.com:4242 # 连接远程控制台的地址
secret: "" # 远程控制台的秘钥
publicaddr: "" # 实例公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址
publicaddrtls: "" # 实例公网地址,提供远程控制台访问的地址,不配置的话使用自动识别的地址(https)
配置覆盖机制
- 如果不存在配置文件,将使用默认配置,该配置值为代码中写死的配置值
- 如果存在配置文件,则使用配置文件中的值覆盖默认值
- http、publish、subscribe三个配置遵循优先级顺序
- 如果发布流或者订阅流中包含对应的参数,则优先使用
- 其次,查找对应插件的配置项中是否包含配置项
- 最后,使用全局配置中的配置
流的状态图
stateDiagram-v2
[*] --> ⌛等待发布者 : 创建
⌛等待发布者 --> 🟢正在发布 :发布
⌛等待发布者 --> 🔴已关闭 :关闭
⌛等待发布者 --> 🔴已关闭 :超时
⌛等待发布者 --> 🔴已关闭 :最后订阅者离开
🟢正在发布 --> ⌛等待发布者: 发布者断开
🟢正在发布 --> 🟡等待关闭: 最后订阅者离开
🟢正在发布 --> 🔴已关闭 :关闭
🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入
🟡等待关闭 --> 🔴已关闭 :关闭
🟡等待关闭 --> 🔴已关闭 :超时
🟡等待关闭 --> 🔴已关闭 :发布者断开
鉴权机制
默认鉴权
在publish 和 subscribe 中配置 key 引擎会自动进行鉴权, 推流或者拉流时需要在url中添加参数 secret=xxx&expire=xxx。
- secret为鉴权前面,MD5(key+StreamPath+expire)
- expire为鉴权失效时间,格式是十六进制 UNIX 时间戳
时间戳计算
设置时间:2018.12.01 08:30:00
十进制 UNIX 时间戳:1543624200
十六进制 UNIX 时间戳:5C01D608(云直播鉴权配置使用十六进制 UNIX 时间戳,十六进制不区分字母大小写)
鉴权签名计算
secret = MD5(key+StreamPath+expire)
secret = MD5(ngoeiq03+test/01+5C01D608)
secret = MD5(ngoeiq03test/015C01D608)
secret = ce797dc6238156d548ef945e6ad1ea20
单独鉴权
如果需要自定义鉴权,可以在插件中实现鉴权接口, 引擎中定义如下两个接口,插件中的发布者或者订阅者可以实现这两个接口,引擎会在发布或者订阅时调用这两个接口进行鉴权
type AuthSub interface {
OnAuth(*util.Promise[ISubscriber]) error
}
type AuthPub interface {
OnAuth(*util.Promise[IPublisher]) error
}
- OnAuth返回错误即鉴权失败
- Promise方便异步鉴权,可以后续调用其Resolve或Reject方法进行鉴权结果的返回
全局鉴权
自定义鉴权也可以全局生效, 引擎中定义如下两个全局函数的变量,插件中可以对这两个变量进行赋值,引擎会在发布或者订阅时调用这两个接口进行鉴权
var OnAuthSub func(p *util.Promise[ISubscriber]) error
var OnAuthPub func(p *util.Promise[IPublisher]) error
** 注意:如果单独鉴权和全局鉴权同时存在,优先使用单独鉴权 ** ** 全局鉴权函数可以被多次覆盖,所以需要自己实现鉴权逻辑的合并 **
Http中间件
在HTTPConfig接口中增加了AddMiddleware方法,可以通过该方法添加中间件,中间件的定义如下
type Middleware func(string, http.Handler) http.Handler
type HTTPConfig interface {
GetHTTPConfig() *HTTP
Listen(ctx context.Context) error
Handle(string, http.Handler)
AddMiddleware(Middleware)
}
中间件的添加必须在FirstConfig之前,也就是在Listen之前 例如:
type MyMiddlewareConfig struct {
config.HTTP
}
var myMiddlewareConfig = &MyMiddlewareConfig{}
func init(){
myMiddlewareConfig.AddMiddleware(func(pattern string, handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// do something
handler.ServeHTTP(w, r)
})
})
}
Documentation
¶
Index ¶
- Constants
- Variables
- func Run(ctx context.Context, configFile string) (err error)
- func ShouldYaml(r *http.Request) bool
- type AddTrackEvent
- type AudioDeConf
- type AudioFrame
- type AudioRTP
- type AuthPub
- type AuthSub
- type ClientConfig
- type ClientIO
- type DefaultYaml
- type ErrorEvent
- type Event
- type FLVFrame
- type FirstConfig
- type GlobalConfig
- func (conf *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_plugins(rw http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_stopPush(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Request)
- func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)
- type HasAnnexB
- type IIO
- type IO
- type IOConfig
- type IPublisher
- type IPuller
- type IPusher
- type ISubscriber
- type MP4Publisher
- type MemoryTs
- func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error)
- func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error)
- func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID)
- func (ts *MemoryTs) WriteTo(w io.Writer) (int64, error)
- func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error)
- type NetWorkInfo
- type NoMoreTrack
- type Plugin
- func (opt *Plugin) Publish(streamPath string, pub IPublisher) error
- func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) (err error)
- func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error)
- func (opt *Plugin) Save() error
- func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error
- func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error)
- func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error
- func (opt *Plugin) Update(conf config.Config)
- type Publisher
- func (p *Publisher) Equal(p2 IPublisher) bool
- func (p *Publisher) GetPublisher() *Publisher
- func (p *Publisher) OnEvent(event any)
- func (p *Publisher) Publish(streamPath string, pub IPublisher) error
- func (p *Publisher) Stop()
- func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL, pool util.BytesPool)
- func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPool)
- type Puller
- type PulseEvent
- type Pusher
- type RTPDumpPublisher
- type SEKick
- type SEclose
- type SEcreate
- type SEpublish
- type SErepublish
- type SEwaitClose
- type SEwaitPublish
- type StateEvent
- type Stream
- func (s *Stream) AddTrack(t Track) (promise *util.Promise[Track])
- func (r *Stream) Close()
- func (s *Stream) GetPublisherConfig() *config.Publish
- func (s *Stream) GetStartTime() time.Time
- func (s *Stream) GetType() string
- func (r *Stream) IsClosed() bool
- func (r *Stream) IsShutdown() bool
- func (s *Stream) Pause()
- func (s *Stream) Receive(event any) bool
- func (s *Stream) RemoveTrack(t Track)
- func (s *Stream) Resume()
- func (s *Stream) SSRC() uint32
- func (s *Stream) SetIDR(video Track)
- func (s *Stream) Summary() (r StreamSummay)
- type StreamAction
- type StreamEvent
- type StreamList
- type StreamState
- type StreamSummay
- type StreamTimeoutConfig
- type SubPulse
- type Subscriber
- func (s *Subscriber) AddTrack(t Track) bool
- func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader)
- func (s *Subscriber) GetSubscriber() *Subscriber
- func (s *Subscriber) IsPlaying() bool
- func (s *Subscriber) OnEvent(event any)
- func (s *Subscriber) PlayBlock(subType byte)
- func (s *Subscriber) PlayFLV()
- func (s *Subscriber) PlayRTP()
- func (s *Subscriber) PlayRaw()
- func (s *Subscriber) SetIO(i any)
- func (s *Subscriber) SubPulse()
- func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error
- type Subscribers
- func (s *Subscribers) AbortWait()
- func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks)
- func (s *Subscribers) Broadcast(event any)
- func (s *Subscribers) Delete(suber ISubscriber)
- func (s *Subscribers) Dispose()
- func (s *Subscribers) Init()
- func (s *Subscribers) Len() int
- func (s *Subscribers) MarshalJSON() ([]byte, error)
- func (s *Subscribers) OnPublisherLost(event StateEvent)
- func (s *Subscribers) OnTrack(track common.Track)
- func (s *Subscribers) Pick() ISubscriber
- func (s *Subscribers) RangeAll(f func(sub ISubscriber))
- type Summary
- type TSPublisher
- type TrackPlayer
- type TrackRemoved
- type Tracks
- type Unsubscribe
- type UnsubscribeEvent
- type VideoDeConf
- type VideoFrame
- type VideoRTP
Constants ¶
const ( NO_SUCH_CONIFG = "no such config" NO_SUCH_STREAM = "no such stream" )
const ( SUBTYPE_RAW = iota SUBTYPE_AVCC SUBTYPE_RTP SUBTYPE_FLV )
const ( SUBSTATE_INIT = iota SUBSTATE_FIRST SUBSTATE_NORMAL )
Variables ¶
var ( ErrBadStreamName = errors.New("Stream Already Exist") ErrBadTrackName = errors.New("Track Already Exist") ErrTrackMute = errors.New("Track Mute") ErrStreamIsClosed = errors.New("Stream Is Closed") ErrPublisherLost = errors.New("Publisher Lost") ErrAuth = errors.New("Auth Failed") OnAuthSub func(p *util.Promise[ISubscriber]) error OnAuthPub func(p *util.Promise[IPublisher]) error )
var ( SysInfo struct { StartTime time.Time //启动时间 LocalIP string Version string } ExecPath = os.Args[0] ExecDir = filepath.Dir(ExecPath) // ConfigRaw 配置信息的原始数据 ConfigRaw []byte Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置 EngineConfig = &GlobalConfig{} Engine = InstallPlugin(EngineConfig) SettingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置 MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置 EventBus chan any )
var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"}
var ErrNoPullConfig = errors.New("no pull config")
var ErrNoPushConfig = errors.New("no push config")
var ErrStreamNotExist = errors.New("stream not exist")
var Pullers sync.Map
var Pushers sync.Map
var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"}
var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{ { ACTION_PUBLISH: STATE_PUBLISHING, ACTION_TIMEOUT: STATE_CLOSED, ACTION_LASTLEAVE: STATE_CLOSED, ACTION_CLOSE: STATE_CLOSED, }, { ACTION_PUBLISHLOST: STATE_WAITPUBLISH, ACTION_LASTLEAVE: STATE_WAITCLOSE, ACTION_CLOSE: STATE_CLOSED, }, { ACTION_PUBLISHLOST: STATE_CLOSED, ACTION_TIMEOUT: STATE_CLOSED, ACTION_FIRSTENTER: STATE_PUBLISHING, ACTION_CLOSE: STATE_CLOSED, }, {}, }
var Streams util.Map[string, *Stream]
Streams 所有的流集合
Functions ¶
func ShouldYaml ¶ added in v4.12.6
Types ¶
type AddTrackEvent ¶ added in v4.11.18
type AudioDeConf ¶
type AudioDeConf []byte
AVCC 格式的序列帧
func (AudioDeConf) WithOutRTMP ¶ added in v4.11.0
func (a AudioDeConf) WithOutRTMP() []byte
type AudioFrame ¶
func (AudioFrame) GetADTS ¶ added in v4.11.11
func (a AudioFrame) GetADTS() (r net.Buffers)
func (AudioFrame) WriteRawTo ¶ added in v4.13.0
func (a AudioFrame) WriteRawTo(w io.Writer) (n int64, err error)
type ClientIO ¶ added in v4.1.0
type ClientIO[C ClientConfig] struct { Config *C StreamPath string // 本地流标识 RemoteURL string // 远程服务器地址(用于推拉) ReConnectCount int //重连次数 }
ClientIO 作为Client角色(Puller,Pusher)的公共结构体
type DefaultYaml ¶ added in v4.9.6
type DefaultYaml string
type ErrorEvent ¶ added in v4.11.18
ErrorEvent 错误事件
type Event ¶ added in v4.11.18
func CreateEvent ¶ added in v4.11.18
type FirstConfig ¶
type GlobalConfig ¶
func (*GlobalConfig) API_closeStream ¶
func (conf *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_getConfig ¶
func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request)
API_getConfig 获取指定的配置信息
func (*GlobalConfig) API_list_pull ¶ added in v4.6.4
func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_list_push ¶ added in v4.6.4
func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_modifyConfig ¶
func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Request)
API_modifyConfig 修改并保存配置
func (*GlobalConfig) API_plugins ¶ added in v4.1.4
func (conf *GlobalConfig) API_plugins(rw http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_replay_mp4 ¶ added in v4.11.17
func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_replay_rtpdump ¶ added in v4.8.6
func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_replay_ts ¶ added in v4.9.8
func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_stopPush ¶ added in v4.7.2
func (conf *GlobalConfig) API_stopPush(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_stream ¶ added in v4.1.0
func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_summary ¶
func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_sysInfo ¶
func (conf *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_updateConfig ¶
func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Request)
API_updateConfig 热更新配置
func (*GlobalConfig) ServeHTTP ¶
func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)
type IO ¶
type IO struct { ID string Type string context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过OnEvent传入父级Context context.CancelFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者 *log.Logger `json:"-" yaml:"-"` StartTime time.Time //创建时间 Stream *Stream `json:"-" yaml:"-"` io.Reader `json:"-" yaml:"-"` io.Writer `json:"-" yaml:"-"` io.Closer `json:"-" yaml:"-"` Args url.Values Spesific IIO `json:"-" yaml:"-"` }
发布者或者订阅者的共用结构体
func (*IO) IsShutdown ¶ added in v4.9.0
type IPublisher ¶
type IPublisher interface { IIO GetPublisher() *Publisher Publish(streamPath string, pub IPublisher) error // contains filtered or unexported methods }
type IPuller ¶
type IPuller interface { IPublisher Connect() error Pull() error Reconnect() bool // contains filtered or unexported methods }
type IPusher ¶
type IPusher interface { ISubscriber Push() error Connect() error Reconnect() bool // contains filtered or unexported methods }
type ISubscriber ¶
type ISubscriber interface { IIO GetSubscriber() *Subscriber IsPlaying() bool PlayRaw() PlayBlock(byte) PlayFLV() Stop() Subscribe(streamPath string, sub ISubscriber) error }
type MP4Publisher ¶ added in v4.11.17
type MP4Publisher struct { Publisher *mp4.MovDemuxer `json:"-" yaml:"-"` }
func (*MP4Publisher) ReadMP4Data ¶ added in v4.11.17
func (p *MP4Publisher) ReadMP4Data(source io.ReadSeeker) error
Start reading the MP4 file
type MemoryTs ¶ added in v4.11.5
func (*MemoryTs) WriteAudioFrame ¶ added in v4.11.5
func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error)
func (*MemoryTs) WritePESPacket ¶ added in v4.11.5
func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error)
func (*MemoryTs) WritePMTPacket ¶ added in v4.11.6
func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID)
func (*MemoryTs) WriteVideoFrame ¶ added in v4.11.5
func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error)
type NetWorkInfo ¶
type NetWorkInfo struct { Name string Receive uint64 Sent uint64 ReceiveSpeed uint64 SentSpeed uint64 }
NetWorkInfo 网速信息
type NoMoreTrack ¶ added in v4.11.18
type NoMoreTrack struct{}
type Plugin ¶
type Plugin struct { context.Context `json:"-" yaml:"-"` context.CancelFunc `json:"-" yaml:"-"` Name string //插件名称 Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置 Version string //插件版本 Yaml string //配置文件中的配置项 RawConfig config.Config //最终合并后的配置的map形式方便查询 Modified config.Config //修改过的配置项 *log.Logger `json:"-" yaml:"-"` Disabled bool // contains filtered or unexported fields }
Plugin 插件信息
func InstallPlugin ¶
InstallPlugin 安装插件,传入插件配置生成插件信息对象
func (*Plugin) Subscribe ¶
func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error
Subscribe 订阅一个流,如果流不存在则创建一个等待流
func (*Plugin) SubscribeBlock ¶
func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error)
SubscribeBlock 阻塞订阅一个流,直到订阅结束
func (*Plugin) SubscribeExist ¶ added in v4.8.8
func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error
SubscribeExist 订阅已经存在的流
type Publisher ¶
type Publisher struct { IO Config *config.Publish common.AudioTrack `json:"-" yaml:"-"` common.VideoTrack `json:"-" yaml:"-"` }
func (*Publisher) Equal ¶
func (p *Publisher) Equal(p2 IPublisher) bool
func (*Publisher) GetPublisher ¶ added in v4.9.0
func (*Publisher) Publish ¶ added in v4.12.4
func (p *Publisher) Publish(streamPath string, pub IPublisher) error
func (*Publisher) WriteAVCCAudio ¶ added in v4.1.0
type RTPDumpPublisher ¶ added in v4.8.6
type RTPDumpPublisher struct { Publisher VCodec codec.VideoCodecID ACodec codec.AudioCodecID VPayloadType uint8 APayloadType uint8 sync.Mutex // contains filtered or unexported fields }
func (*RTPDumpPublisher) Feed ¶ added in v4.11.17
func (t *RTPDumpPublisher) Feed(file *os.File)
func (*RTPDumpPublisher) WriteRTP ¶ added in v4.11.17
func (t *RTPDumpPublisher) WriteRTP(raw []byte)
type SEclose ¶
type SEclose struct {
StateEvent
}
type SEcreate ¶ added in v4.11.18
type SEcreate struct {
StreamEvent
}
type SEpublish ¶
type SEpublish struct {
StateEvent
}
type SErepublish ¶ added in v4.11.1
type SErepublish struct {
StateEvent
}
type SEwaitClose ¶
type SEwaitClose struct {
StateEvent
}
type SEwaitPublish ¶
type SEwaitPublish struct { StateEvent Publisher IPublisher }
type StateEvent ¶
type StateEvent struct { StreamEvent Action StreamAction From StreamState }
StateEvent 状态机事件
func (StateEvent) Next ¶
func (se StateEvent) Next() (next StreamState, ok bool)
type Stream ¶
type Stream struct { *log.Logger StartTime time.Time //创建时间 StreamTimeoutConfig Path string Publisher IPublisher State StreamState SEHistory []StateEvent // 事件历史 Subscribers Subscribers // 订阅者 Tracks Tracks AppName string StreamName string IsPause bool // 是否处于暂停状态 // contains filtered or unexported fields }
Stream 流定义
func FilterStreams ¶
func FilterStreams[T IPublisher]() (ss []*Stream)
func (*Stream) GetPublisherConfig ¶ added in v4.11.0
func (*Stream) GetStartTime ¶ added in v4.11.14
func (*Stream) IsShutdown ¶ added in v4.6.5
func (*Stream) RemoveTrack ¶
func (s *Stream) RemoveTrack(t Track)
func (*Stream) Summary ¶ added in v4.1.0
func (s *Stream) Summary() (r StreamSummay)
Summary 返回流的简要信息
type StreamAction ¶
type StreamAction byte
const ( ACTION_PUBLISH StreamAction = iota ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到 ACTION_PUBLISHLOST // 发布者意外断开 ACTION_CLOSE // 主动关闭流 ACTION_LASTLEAVE // 最后一个订阅者离开 ACTION_FIRSTENTER // 第一个订阅者进入 )
func (StreamAction) String ¶ added in v4.12.4
func (s StreamAction) String() string
type StreamEvent ¶ added in v4.11.18
type StreamList ¶ added in v4.8.2
type StreamList []*Stream
func GetSortedStreamList ¶ added in v4.8.2
func GetSortedStreamList() StreamList
func (StreamList) Len ¶ added in v4.8.2
func (l StreamList) Len() int
func (StreamList) Less ¶ added in v4.8.2
func (l StreamList) Less(i, j int) bool
func (StreamList) Sort ¶ added in v4.8.2
func (l StreamList) Sort()
func (StreamList) Swap ¶ added in v4.8.2
func (l StreamList) Swap(i, j int)
type StreamState ¶
type StreamState byte
const ( STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 STATE_PUBLISHING // 正在发布流状态 STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启) STATE_CLOSED // 流已关闭,不可使用 )
四状态机
func (StreamState) String ¶ added in v4.12.4
func (s StreamState) String() string
type StreamSummay ¶ added in v4.1.0
type StreamTimeoutConfig ¶
type SubPulse ¶ added in v4.11.18
type SubPulse struct {
ISubscriber
}
type Subscriber ¶
type Subscriber struct { IO Config *config.Subscribe TrackPlayer `json:"-" yaml:"-"` }
Subscriber 订阅者实体定义
func (*Subscriber) AddTrack ¶
func (s *Subscriber) AddTrack(t Track) bool
func (*Subscriber) CreateTrackReader ¶ added in v4.11.6
func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader)
func (*Subscriber) GetSubscriber ¶ added in v4.9.0
func (s *Subscriber) GetSubscriber() *Subscriber
func (*Subscriber) IsPlaying ¶
func (s *Subscriber) IsPlaying() bool
func (*Subscriber) OnEvent ¶
func (s *Subscriber) OnEvent(event any)
func (*Subscriber) PlayFLV ¶ added in v4.3.0
func (s *Subscriber) PlayFLV()
func (*Subscriber) PlayRTP ¶ added in v4.4.0
func (s *Subscriber) PlayRTP()
func (*Subscriber) PlayRaw ¶ added in v4.3.0
func (s *Subscriber) PlayRaw()
func (*Subscriber) SetIO ¶ added in v4.12.2
func (s *Subscriber) SetIO(i any)
func (*Subscriber) SubPulse ¶ added in v4.11.18
func (s *Subscriber) SubPulse()
func (*Subscriber) Subscribe ¶ added in v4.12.4
func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error
type Subscribers ¶ added in v4.9.3
type Subscribers struct {
// contains filtered or unexported fields
}
func (*Subscribers) AbortWait ¶ added in v4.9.5
func (s *Subscribers) AbortWait()
func (*Subscribers) Add ¶ added in v4.9.3
func (s *Subscribers) Add(suber ISubscriber, wait *waitTracks)
func (*Subscribers) Broadcast ¶ added in v4.9.3
func (s *Subscribers) Broadcast(event any)
func (*Subscribers) Delete ¶ added in v4.9.3
func (s *Subscribers) Delete(suber ISubscriber)
func (*Subscribers) Dispose ¶ added in v4.9.5
func (s *Subscribers) Dispose()
func (*Subscribers) Init ¶ added in v4.9.5
func (s *Subscribers) Init()
func (*Subscribers) Len ¶ added in v4.9.5
func (s *Subscribers) Len() int
func (*Subscribers) MarshalJSON ¶ added in v4.9.3
func (s *Subscribers) MarshalJSON() ([]byte, error)
func (*Subscribers) OnPublisherLost ¶ added in v4.9.5
func (s *Subscribers) OnPublisherLost(event StateEvent)
func (*Subscribers) OnTrack ¶ added in v4.9.5
func (s *Subscribers) OnTrack(track common.Track)
func (*Subscribers) Pick ¶ added in v4.9.3
func (s *Subscribers) Pick() ISubscriber
func (*Subscribers) RangeAll ¶ added in v4.9.5
func (s *Subscribers) RangeAll(f func(sub ISubscriber))
type Summary ¶
type Summary struct { Address string Memory struct { Total uint64 Free uint64 Used uint64 Usage float64 } CPUUsage float64 HardDisk struct { Total uint64 Free uint64 Used uint64 Usage float64 } NetWork []NetWorkInfo Streams []StreamSummay // contains filtered or unexported fields }
ServerSummary 系统摘要定义
type TSPublisher ¶
type TSPublisher struct { Publisher mpegts.MpegTsStream `json:"-" yaml:"-"` // contains filtered or unexported fields }
func (*TSPublisher) OnEvent ¶
func (t *TSPublisher) OnEvent(event any)
func (*TSPublisher) OnPmtStream ¶
func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream)
func (*TSPublisher) ReadPES ¶ added in v4.9.8
func (t *TSPublisher) ReadPES()
type TrackPlayer ¶
type TrackPlayer struct { context.Context context.CancelFunc AudioReader, VideoReader *track.AVRingReader Audio *track.Audio Video *track.Video }
type TrackRemoved ¶
type TrackRemoved struct {
Track
}
type Tracks ¶ added in v4.5.1
func (*Tracks) MarshalJSON ¶ added in v4.5.1
type Unsubscribe ¶ added in v4.11.18
type Unsubscribe ISubscriber
type UnsubscribeEvent ¶ added in v4.9.0
type UnsubscribeEvent struct { Event[ISubscriber] }
type VideoDeConf ¶
type VideoDeConf []byte
AVCC 格式的序列帧
func (VideoDeConf) WithOutRTMP ¶ added in v4.11.0
func (v VideoDeConf) WithOutRTMP() []byte
type VideoFrame ¶
func (VideoFrame) GetAnnexB ¶ added in v4.2.0
func (v VideoFrame) GetAnnexB() (r net.Buffers)
func (VideoFrame) WriteAnnexBTo ¶ added in v4.11.0
func (v VideoFrame) WriteAnnexBTo(w io.Writer) (n int64, err error)