engine

package module
v4.0.0-alpha9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2022 License: MIT Imports: 32 Imported by: 56

README

m7s v4核心引擎

该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。包含了一个插件的引入机制,其他功能均由插件实现

引擎的基本功能

  • 引擎初始化会加载配置文件,引入的插件会自动注册到引擎中
  • 配置文件中配置会被解析并覆盖插件的默认配置
  • 引擎提供配置热更新机制(具体热更新逻辑由插件实现)
  • 读取插件的特殊方法,将其注册为可供HTTP访问的API接口
  • 具有发布功能的插件,可以将流注入到引擎中
  • 具有订阅功能的插件,可以从引擎中订阅到流
  • 引擎会将流中的数据放入RingBuffer中缓存,以便插件可以获取数据
  • 引擎提供了从远端拉流和以及向远端推流的基础框架
  • 引擎包了zap日志框架
  • 引擎提供事件总线机制,可以对所有插件广播事件

引擎自带HTTP接口

  • 终止某一个流 /api/closeStream?streamPath=xxx
  • 获取engine信息 /api/sysInfo 返回值{Version:xxx,StartTime:xxx}
  • 获取系统基本情况 /api/summary 返回值Summary数据

引擎默认配置

global:
  http:
    # 网关地址,用于访问API
    listenaddr: :8080
    # 用于HTTPS方式访问API的端口配置
    listenaddrtls: ""
    certfile: ""
    keyfile: ""
    # 是否自动添加cors头
    cors: true
    # 用户名和密码,用于API访问时的基本身份认证
    username: ""
    password: ""
  publish:
      # 是否发布音频流
      pubaudio: true
      # 是否发布视频流
      pubvideo: true
      # 剔出已经存在的发布者,用于顶替原有发布者
      kickexist: false
      # 发布流默认过期时间单位秒,超过该时间发布者没有恢复流将被删除
      publishtimeout: 10
      # 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭)
      waitclosetimeout: 0
  subscribe:
      # 是否订阅音频流
      subaudio: true
      # 是否订阅视频流
      subvideo: true
      # 只订阅关键帧
      iframeonly: false
      # 等待发布者的秒数,用于订阅尚未发布的流
      waittimeout: 10
  # 启用RTP包乱序重排
  rtpreorder : false
  # 启用AVCC格式缓存,用于rtmp协议
  enableavcc : true
  # 启用rtp格式缓存,用于rtsp、websocket、gb28181协议
  enablertp : true
  # 启用flv格式缓存,用于HDL协议,以及flv格式写文件
  enableflv : true
  # 连接远程控制台的地址
  consoleurl : wss://console.monibuca.com:8080
  # 远程控制台的秘钥
  secret: ""

配置覆盖机制

  • 如果不存在配置文件,将使用默认配置,该配置值为代码中写死的配置值
  • 如果存在配置文件,则使用配置文件中的值覆盖默认值
  • http、publish、subscribe三个配置遵循优先级顺序
  1. 如果发布流或者订阅流中包含对应的参数,则优先使用
  2. 其次,查找对应插件的配置项中是否包含配置项
  3. 最后,使用全局配置中的配置

流的状态图

stateDiagram-v2
    [*] --> ⌛等待发布者 : 创建
    ⌛等待发布者 --> 🟢正在发布 :发布
    ⌛等待发布者 --> 🔴已关闭 :关闭
    ⌛等待发布者 --> 🔴已关闭  :超时
    ⌛等待发布者 --> 🔴已关闭  :最后订阅者离开
    🟢正在发布 --> ⌛等待发布者: 发布者断开
    🟢正在发布 --> 🟡等待关闭: 最后订阅者离开
    🟢正在发布 --> 🔴已关闭  :关闭
    🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入
    🟡等待关闭 --> 🔴已关闭  :关闭
    🟡等待关闭 --> 🔴已关闭  :超时
    🟡等待关闭 --> 🔴已关闭  :发布者断开

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ExecPath = os.Args[0]
	ExecDir  = filepath.Dir(ExecPath)
	// ConfigRaw 配置信息的原始数据
	ConfigRaw    []byte
	StartTime    time.Time                  //启动时间
	Plugins      = make(map[string]*Plugin) // Plugins 所有的插件配置
	EngineConfig = &GlobalConfig{
		Engine: config.Global,
	}

	Engine       = InstallPlugin(EngineConfig)              //复用安装插件逻辑,将全局配置信息注入,并启动server
	MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
	EventBus     = make(chan any, 10)
)
View Source
var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"}
View Source
var BadNameErr = errors.New("Bad Name")
View Source
var NoPullConfigErr = errors.New("no pull config")
View Source
var NoPushConfigErr = errors.New("no push config")
View Source
var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"}
View Source
var StreamIsClosedErr = errors.New("Stream Is Closed")
View Source
var Streams = util.Map[string, *Stream]{Map: make(map[string]*Stream)}

Streams 所有的流集合

Functions

func Run

func Run(ctx context.Context, configFile string) (err error)

Run 启动Monibuca引擎,传入总的Context,可用于关闭所有

Types

type AudioDeConf

type AudioDeConf DecoderConfiguration[AudioSlice]

func (AudioDeConf) GetAVCC

func (a AudioDeConf) GetAVCC() net.Buffers

func (AudioDeConf) GetFLV

func (a AudioDeConf) GetFLV() net.Buffers

type AudioFrame

type AudioFrame AVFrame[AudioSlice]

func (*AudioFrame) GetAVCC

func (a *AudioFrame) GetAVCC() net.Buffers

func (*AudioFrame) GetFLV

func (a *AudioFrame) GetFLV() net.Buffers

func (*AudioFrame) GetRTP

func (a *AudioFrame) GetRTP() []*RTPFrame

type Client

type Client[C ClientConfig] struct {
	Config         *C
	StreamPath     string // 本地流标识
	RemoteURL      string // 远程服务器地址(用于推拉)
	ReConnectCount int    //重连次数
}

type ClientConfig

type ClientConfig interface {
	config.Pull | config.Push
}

type FirstConfig

type FirstConfig config.Config

type GlobalConfig

type GlobalConfig struct {
	*config.Engine
}

func (*GlobalConfig) API_closeStream

func (config *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_summary

func (config *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_sysInfo

func (config *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) ServeHTTP

func (config *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)

type HaveAVCC

type HaveAVCC interface {
	GetAVCC() net.Buffers
}

type HaveFLV

type HaveFLV interface {
	GetFLV() net.Buffers
}

type HaveRTP

type HaveRTP interface {
	GetRTP() []*RTPFrame
}

type IIO

type IIO interface {
	IsClosed() bool
	OnEvent(any)
	Stop()
}

type IO

type IO[C IOConfig, S IIO] struct {
	ID                 string
	Type               string
	context.Context    //不要直接设置,应当通过OnEvent传入父级Context
	context.CancelFunc //流关闭是关闭发布者或者订阅者
	*zap.Logger
	StartTime time.Time //创建时间
	Stream    *Stream   `json:"-"`
	io.Reader `json:"-"`
	io.Writer `json:"-"`
	io.Closer `json:"-"`
	Args      url.Values
	Config    *C
}

func (*IO[C, S]) GetConfig

func (io *IO[C, S]) GetConfig() *C

func (*IO[C, S]) IsClosed

func (io *IO[C, S]) IsClosed() bool

func (*IO[C, S]) OnEvent

func (i *IO[C, S]) OnEvent(event any)

func (*IO[C, S]) SetIO

func (i *IO[C, S]) SetIO(conn any)

SetIO(可选) 设置Writer、Reader、Closer

func (*IO[C, S]) SetParentCtx

func (i *IO[C, S]) SetParentCtx(parent context.Context)

SetParentCtx(可选)

func (*IO[C, S]) Stop

func (io *IO[C, S]) Stop()

Stop 停止订阅或者发布,由订阅者或者发布者调用

type IOConfig

type IOConfig interface {
	config.Publish | config.Subscribe
}

type IPublisher

type IPublisher interface {
	IIO
	GetConfig() *config.Publish
	// contains filtered or unexported methods
}

type IPuller

type IPuller interface {
	IPublisher
	Connect() error
	Pull()
	Reconnect() bool
	// contains filtered or unexported methods
}

type IPusher

type IPusher interface {
	ISubscriber
	Push()
	Connect() error

	Reconnect() bool
	// contains filtered or unexported methods
}

type ISubscriber

type ISubscriber interface {
	IIO

	GetConfig() *config.Subscribe
	IsPlaying() bool
	Play(ISubscriber) func() error
	PlayBlock(ISubscriber)
	Stop()
	// contains filtered or unexported methods
}

type NetWorkInfo

type NetWorkInfo struct {
	Name         string
	Receive      uint64
	Sent         uint64
	ReceiveSpeed uint64
	SentSpeed    uint64
}

NetWorkInfo 网速信息

type Plugin

type Plugin struct {
	context.Context    `json:"-"`
	context.CancelFunc `json:"-"`
	Name               string        //插件名称
	Config             config.Plugin //插件配置
	Version            string        //插件版本
	RawConfig          config.Config //配置的map形式方便查询
	Modified           config.Config //修改过的配置项
	*zap.Logger
}

Plugin 插件信息

func InstallPlugin

func InstallPlugin(config config.Plugin) *Plugin

InstallPlugin 安装插件,传入插件配置生成插件信息对象

func (*Plugin) Publish

func (opt *Plugin) Publish(streamPath string, pub IPublisher) error

func (*Plugin) Pull

func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool) (err error)

func (*Plugin) Push

func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error)

func (*Plugin) Save

func (opt *Plugin) Save() error

func (*Plugin) Subscribe

func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error

func (*Plugin) SubscribeBlock

func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber) (err error)

func (*Plugin) Update

func (opt *Plugin) Update(conf config.Config)

Update 热更新配置

type Publisher

func (*Publisher) OnEvent

func (p *Publisher) OnEvent(event any)

func (*Publisher) Stop

func (p *Publisher) Stop()

type Puller

type Puller struct {
	Client[config.Pull]
}

用于远程拉流的发布者

func (*Puller) Reconnect

func (pub *Puller) Reconnect() (ok bool)

是否需要重连

type Pusher

type Pusher struct {
	Client[config.Push]
}

func (*Pusher) Reconnect

func (pub *Pusher) Reconnect() bool

是否需要重连

type SEKick

type SEKick struct {
}

type SEclose

type SEclose struct {
	StateEvent
}

type SEpublish

type SEpublish struct {
	StateEvent
}

type SEwaitClose

type SEwaitClose struct {
	StateEvent
}

type SEwaitPublish

type SEwaitPublish struct {
	StateEvent
	Publisher IPublisher
}

type StateEvent

type StateEvent struct {
	Action StreamAction
	From   StreamState
	Stream *Stream
}

func (StateEvent) Next

func (se StateEvent) Next() (next StreamState, ok bool)

type Stream

type Stream struct {
	*zap.Logger
	StartTime time.Time //创建时间
	StreamTimeoutConfig
	Path        string
	Publisher   IPublisher
	State       StreamState
	Subscribers []ISubscriber // 订阅者
	Tracks      map[string]Track
	AppName     string
	StreamName  string
	// contains filtered or unexported fields
}

Stream 流定义

func FilterStreams

func FilterStreams[T IPublisher]() (ss []*Stream)

func (*Stream) AddTrack

func (s *Stream) AddTrack(t Track)

func (*Stream) Close

func (s *Stream) Close()

func (*Stream) IsClosed

func (r *Stream) IsClosed() bool

func (*Stream) NewAudioTrack

func (r *Stream) NewAudioTrack() (at *track.UnknowAudio)

func (*Stream) NewDataTrack

func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data)

func (*Stream) NewVideoTrack

func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo)

如果暂时不知道编码格式可以用这个

func (*Stream) Receive

func (s *Stream) Receive(event any) bool

func (*Stream) RemoveTrack

func (s *Stream) RemoveTrack(t Track)

func (*Stream) SSRC

func (s *Stream) SSRC() uint32

type StreamAction

type StreamAction byte
const (
	ACTION_PUBLISH     StreamAction = iota
	ACTION_TIMEOUT                  // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
	ACTION_PUBLISHLOST              // 发布者意外断开
	ACTION_CLOSE                    // 主动关闭流
	ACTION_LASTLEAVE                // 最后一个订阅者离开
	ACTION_FIRSTENTER               // 第一个订阅者进入
)

type StreamState

type StreamState byte
const (
	STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
	STATE_PUBLISHING                     // 正在发布流状态
	STATE_WAITCLOSE                      // 等待关闭状态(自动关闭延时开启)
	STATE_CLOSED                         // 流已关闭,不可使用
)

四状态机

type StreamTimeoutConfig

type StreamTimeoutConfig struct {
	WaitTimeout      time.Duration
	PublishTimeout   time.Duration
	WaitCloseTimeout time.Duration
}

type Subscriber

type Subscriber struct {
	IO[config.Subscribe, ISubscriber]
	TrackPlayer
}

Subscriber 订阅者实体定义

func (*Subscriber) AddTrack

func (s *Subscriber) AddTrack(t Track) bool

func (*Subscriber) IsPlaying

func (s *Subscriber) IsPlaying() bool

func (*Subscriber) OnEvent

func (s *Subscriber) OnEvent(event any)

func (*Subscriber) Play

func (s *Subscriber) Play(spesic ISubscriber) func() error

非阻塞式读取,通过反复调用返回的函数可以尝试读取数据,读取到数据后会调用OnEvent,这种模式自由的在不同的goroutine中调用

func (*Subscriber) PlayBlock

func (s *Subscriber) PlayBlock(spesic ISubscriber)

PlayBlock 阻塞式读取数据

func (*Subscriber) Stop

func (s *Subscriber) Stop()

type Summary

type Summary struct {
	Address string
	Memory  struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	CPUUsage float64
	HardDisk struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	NetWork []NetWorkInfo
	Streams []*Stream
	// contains filtered or unexported fields
}

ServerSummary 系统摘要定义

func (*Summary) Add

func (s *Summary) Add()

Add 增加订阅者

func (*Summary) Done

func (s *Summary) Done()

Done 删除订阅者

func (*Summary) Point

func (s *Summary) Point() *Summary

func (*Summary) Report

func (s *Summary) Report(slave *Summary)

Report 上报数据

func (*Summary) Running

func (s *Summary) Running() bool

Running 是否正在采集数据

func (*Summary) Start

func (s *Summary) Start()

StartSummary 开始定时采集数据,每秒一次

type TrackPlayer

type TrackPlayer struct {
	context.Context
	context.CancelFunc
	AudioTrack *track.Audio
	VideoTrack *track.Video
	// contains filtered or unexported fields
}

type TrackRemoved

type TrackRemoved struct {
	Track
}

type VideoDeConf

type VideoDeConf DecoderConfiguration[NALUSlice]

func (VideoDeConf) GetAVCC

func (a VideoDeConf) GetAVCC() net.Buffers

func (VideoDeConf) GetFLV

func (a VideoDeConf) GetFLV() net.Buffers

type VideoFrame

type VideoFrame AVFrame[NALUSlice]

func (*VideoFrame) GetAVCC

func (v *VideoFrame) GetAVCC() net.Buffers

func (*VideoFrame) GetFLV

func (v *VideoFrame) GetFLV() net.Buffers

func (*VideoFrame) GetRTP

func (v *VideoFrame) GetRTP() []*RTPFrame

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL