engine

package module
v4.0.0-alpha4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2022 License: MIT Imports: 32 Imported by: 56

README

m7s核心引擎

该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。仅包含最基础的功能,不含任何网络协议部分,但包含了一个插件的引入机制,其他功能均由插件实现

引擎配置

[Engine]
EnableAudio = true
EnableVideo = true
# 发布流默认过期时间单位秒
PublishTimeout = 60
# 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭)
AutoCloseDelay = 10
# 启用RTP包乱序重排
RTPReorder = false

流的状态图

stateDiagram-v2
    [*] --> ⌛等待发布者 : 创建
    ⌛等待发布者 --> 🟢正在发布 :发布
    ⌛等待发布者 --> 🔴已关闭 :关闭
    ⌛等待发布者 --> 🔴已关闭  :超时
    ⌛等待发布者 --> 🔴已关闭  :最后订阅者离开
    🟢正在发布 --> ⌛等待发布者: 发布者断开
    🟢正在发布 --> 🟡等待关闭: 最后订阅者离开
    🟢正在发布 --> 🔴已关闭  :关闭
    🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入
    🟡等待关闭 --> 🔴已关闭  :关闭
    🟡等待关闭 --> 🔴已关闭  :超时
    🟡等待关闭 --> 🔴已关闭  :发布者断开

引擎的基本功能

  • 引擎初始化会加载配置文件,并逐个调用插件的Run函数
  • 具有发布功能的插件,新建一个Stream对象,这个Stream对象随后可以被订阅
  • Stream对象中含有两个列表,一个是VideoTracks一个是AudioTracks用来存放视频数据和音频数据
  • 每一个VideoTrack或者AudioTrack中包含一个RingBuffer,用来存储发布者提供的数据,同时提供订阅者访问。
  • 具有订阅功能的插件,会通过GetStream函数获取到一个流,然后选择VideoTracks、AudioTracks里面的RingBuffer进行连续的读取

发布插件如何发布流

以rtmp协议为例子

stream = &engine.Stream{Type: "RTMP", StreamPath: streamPath}
if stream.Publish() {
  absTs := make(map[uint32]uint32)
  vt := stream.NewVideoTrack(0)
  at := stream.NewAudioTrack(0)
  rec_audio = func(msg *Chunk) {
    if msg.ChunkType == 0 {
      absTs[msg.ChunkStreamID] = 0
    }
    if msg.Timestamp == 0xffffff {
      absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
    } else {
      absTs[msg.ChunkStreamID] += msg.Timestamp
    }
    at.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
  }
  rec_video = func(msg *Chunk) {
    if msg.ChunkType == 0 {
      absTs[msg.ChunkStreamID] = 0
    }
    if msg.Timestamp == 0xffffff {
      absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
    } else {
      absTs[msg.ChunkStreamID] += msg.Timestamp
    }
    vt.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
  }
  err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
  err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
} else {
  err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
}

默认会创建一个VideoTrack和一个AudioTrack 当我们接收到数据的时候就可以朝里面填充物数据了

在填充数据之前,需要获取到SPS和PPS,然后设置好,因为订阅者需要先发送这个数据 然后通过Track到Push函数将数据填充到RingBuffer里面去

订阅插件如何订阅流

sub := Subscriber{ID: r.RemoteAddr, Type: "FLV", Ctx2: r.Context()}
if err := sub.Subscribe(stringPath); err == nil {
  vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack()
  var buffer bytes.Buffer
  if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil {
    return
  }
  if vt != nil {
    codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, 0, vt.ExtraData.Payload)
    sub.OnVideo = func(ts uint32, pack *VideoPack) {
      codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, ts, pack.Payload)
    }
  }
  if at != nil {
    if at.CodecID == 10 {
      codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, 0, at.ExtraData)
    }
    sub.OnAudio = func(ts uint32, pack *AudioPack) {
      codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, ts, pack.Payload)
    }
  }
  sub.Play(at, vt)
}
  • 在发送数据前,需要先发送音视频的序列帧

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ExecPath = os.Args[0]
	ExecDir  = filepath.Dir(ExecPath)
	// ConfigRaw 配置信息的原始数据
	ConfigRaw    []byte
	StartTime    time.Time                  //启动时间
	Plugins      = make(map[string]*Plugin) // Plugins 所有的插件配置
	EngineConfig = &GlobalConfig{
		Engine: config.Global,
	}

	Engine       = InstallPlugin(EngineConfig)              //复用安装插件逻辑,将全局配置信息注入,并启动server
	MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
	EventBus     = make(chan any, 10)
)
View Source
var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"}
View Source
var BadNameErr = errors.New("Bad Name")
View Source
var NoPullConfigErr = errors.New("no pull config")
View Source
var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"}
View Source
var StreamIsClosedErr = errors.New("Stream Is Closed")
View Source
var Streams = util.Map[string, *Stream]{Map: make(map[string]*Stream)}

Streams 所有的流集合

Functions

func Run

func Run(ctx context.Context, configFile string) (err error)

Run 启动Monibuca引擎,传入总的Context,可用于关闭所有

Types

type AudioDeConf

type AudioDeConf DecoderConfiguration[AudioSlice]

func (AudioDeConf) GetAVCC

func (a AudioDeConf) GetAVCC() net.Buffers

func (AudioDeConf) GetFLV

func (a AudioDeConf) GetFLV() net.Buffers

type AudioFrame

type AudioFrame AVFrame[AudioSlice]

func (*AudioFrame) GetAVCC

func (a *AudioFrame) GetAVCC() net.Buffers

func (*AudioFrame) GetFLV

func (a *AudioFrame) GetFLV() net.Buffers

func (*AudioFrame) GetRTP

func (a *AudioFrame) GetRTP() []*RTPFrame

type Client

type Client[C ClientConfig] struct {
	Config         *C
	StreamPath     string // 本地流标识
	RemoteURL      string // 远程服务器地址(用于推拉)
	ReConnectCount int    //重连次数
}

type ClientConfig

type ClientConfig interface {
	config.Pull | config.Push
}

type FirstConfig

type FirstConfig config.Config

type GlobalConfig

type GlobalConfig struct {
	*config.Engine
}

func (*GlobalConfig) API_closeStream

func (config *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_summary

func (config *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) API_sysInfo

func (config *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)

func (*GlobalConfig) ServeHTTP

func (config *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)

type HaveAVCC

type HaveAVCC interface {
	GetAVCC() net.Buffers
}

type HaveFLV

type HaveFLV interface {
	GetFLV() net.Buffers
}

type IIO

type IIO interface {
	IsClosed() bool
	OnEvent(any)
	Stop()
	// contains filtered or unexported methods
}

type IO

type IO[C IOConfig, S IIO] struct {
	ID                 string
	Type               string
	context.Context    //不要直接设置,应当通过OnEvent传入父级Context
	context.CancelFunc //流关闭是关闭发布者或者订阅者
	*zap.Logger
	StartTime time.Time //创建时间
	Stream    *Stream   `json:"-"`
	io.Reader `json:"-"`
	io.Writer `json:"-"`
	io.Closer `json:"-"`
	Args      url.Values
	Config    *C
}

func (*IO[C, S]) GetConfig

func (io *IO[C, S]) GetConfig() *C

func (*IO[C, S]) IsClosed

func (io *IO[C, S]) IsClosed() bool

func (*IO[C, S]) OnEvent

func (i *IO[C, S]) OnEvent(event any)

func (*IO[C, S]) SetIO

func (i *IO[C, S]) SetIO(conn any)

SetIO(可选) 设置Writer、Reader、Closer

func (*IO[C, S]) SetParentCtx

func (i *IO[C, S]) SetParentCtx(parent context.Context)

SetParentCtx(可选)

func (*IO[C, S]) Stop

func (io *IO[C, S]) Stop()

Stop 停止订阅或者发布,由订阅者或者发布者调用

type IOConfig

type IOConfig interface {
	config.Publish | config.Subscribe
}

type IPublisher

type IPublisher interface {
	IIO
	GetConfig() *config.Publish
	// contains filtered or unexported methods
}

type IPuller

type IPuller interface {
	IPublisher
	Connect() error
	Pull()
	Reconnect() bool
	// contains filtered or unexported methods
}

type IPusher

type IPusher interface {
	ISubscriber
	Push()
	Connect() error

	Reconnect() bool
	// contains filtered or unexported methods
}

type ISubscriber

type ISubscriber interface {
	IIO

	GetConfig() *config.Subscribe
	IsPlaying() bool
	Play(ISubscriber) func() error
	PlayBlock(ISubscriber)
	Stop()
	// contains filtered or unexported methods
}

type NetWorkInfo

type NetWorkInfo struct {
	Name         string
	Receive      uint64
	Sent         uint64
	ReceiveSpeed uint64
	SentSpeed    uint64
}

NetWorkInfo 网速信息

type Plugin

type Plugin struct {
	context.Context    `json:"-"`
	context.CancelFunc `json:"-"`
	Name               string        //插件名称
	Config             config.Plugin //插件配置
	Version            string        //插件版本
	RawConfig          config.Config //配置的map形式方便查询
	Modified           config.Config //修改过的配置项
	*zap.Logger
}

Plugin 插件信息

func InstallPlugin

func InstallPlugin(config config.Plugin) *Plugin

InstallPlugin 安装插件,传入插件配置生成插件信息对象

func (*Plugin) Publish

func (opt *Plugin) Publish(streamPath string, pub IPublisher) error

func (*Plugin) Pull

func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool) (err error)

func (*Plugin) Push

func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error)

func (*Plugin) Save

func (opt *Plugin) Save() error

func (*Plugin) Subscribe

func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error

func (*Plugin) Update

func (opt *Plugin) Update(conf config.Config)

Update 热更新配置

type Publisher

func (*Publisher) OnEvent

func (p *Publisher) OnEvent(event any)

type Puller

type Puller struct {
	Client[config.Pull]
}

用于远程拉流的发布者

func (*Puller) Reconnect

func (pub *Puller) Reconnect() (ok bool)

是否需要重连

type Pusher

type Pusher struct {
	Client[config.Push]
}

func (*Pusher) Reconnect

func (pub *Pusher) Reconnect() bool

是否需要重连

type SEKick

type SEKick struct {
}

type SEclose

type SEclose struct {
	StateEvent
}

type SEpublish

type SEpublish struct {
	StateEvent
}

type SEwaitClose

type SEwaitClose struct {
	StateEvent
}

type SEwaitPublish

type SEwaitPublish struct {
	StateEvent
	Publisher IPublisher
}

type StateEvent

type StateEvent struct {
	Action StreamAction
	From   StreamState
	Stream *Stream
}

func (StateEvent) Next

func (se StateEvent) Next() (next StreamState, ok bool)

type Stream

type Stream struct {
	*zap.Logger
	StartTime time.Time //创建时间
	StreamTimeoutConfig
	Path        string
	Publisher   IPublisher
	State       StreamState
	Subscribers []ISubscriber // 订阅者
	Tracks      map[string]Track
	AppName     string
	StreamName  string
	// contains filtered or unexported fields
}

Stream 流定义

func FilterStreams

func FilterStreams[T IPublisher]() (ss []*Stream)

func (*Stream) AddTrack

func (s *Stream) AddTrack(t Track)

func (*Stream) Close

func (s *Stream) Close()

func (*Stream) IsClosed

func (r *Stream) IsClosed() bool

func (*Stream) NewAudioTrack

func (r *Stream) NewAudioTrack() (at *track.UnknowAudio)

func (*Stream) NewDataTrack

func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data)

func (*Stream) NewVideoTrack

func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo)

如果暂时不知道编码格式可以用这个

func (*Stream) Receive

func (s *Stream) Receive(event any) bool

func (*Stream) RemoveTrack

func (s *Stream) RemoveTrack(t Track)

func (*Stream) SSRC

func (s *Stream) SSRC() uint32

type StreamAction

type StreamAction byte
const (
	ACTION_PUBLISH     StreamAction = iota
	ACTION_TIMEOUT                  // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
	ACTION_PUBLISHLOST              // 发布者意外断开
	ACTION_CLOSE                    // 主动关闭流
	ACTION_LASTLEAVE                // 最后一个订阅者离开
	ACTION_FIRSTENTER               // 第一个订阅者进入
)

type StreamState

type StreamState byte
const (
	STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
	STATE_PUBLISHING                     // 正在发布流状态
	STATE_WAITCLOSE                      // 等待关闭状态(自动关闭延时开启)
	STATE_CLOSED                         // 流已关闭,不可使用
)

四状态机

type StreamTimeoutConfig

type StreamTimeoutConfig struct {
	WaitTimeout      time.Duration
	PublishTimeout   time.Duration
	WaitCloseTimeout time.Duration
}

type Subscriber

type Subscriber struct {
	IO[config.Subscribe, ISubscriber]
	TrackPlayer
}

Subscriber 订阅者实体定义

func (*Subscriber) AddTrack

func (s *Subscriber) AddTrack(t Track) bool

func (*Subscriber) IsPlaying

func (s *Subscriber) IsPlaying() bool

func (*Subscriber) OnEvent

func (s *Subscriber) OnEvent(event any)

func (*Subscriber) Play

func (s *Subscriber) Play(spesic ISubscriber) func() error

非阻塞式读取,通过反复调用返回的函数可以尝试读取数据,读取到数据后会调用OnEvent,这种模式自由的在不同的goroutine中调用

func (*Subscriber) PlayBlock

func (s *Subscriber) PlayBlock(spesic ISubscriber)

PlayBlock 阻塞式读取数据

func (*Subscriber) Stop

func (s *Subscriber) Stop()

type Summary

type Summary struct {
	Address string
	Memory  struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	CPUUsage float64
	HardDisk struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	NetWork []NetWorkInfo
	Streams []*Stream
	// contains filtered or unexported fields
}

ServerSummary 系统摘要定义

func (*Summary) Add

func (s *Summary) Add()

Add 增加订阅者

func (*Summary) Done

func (s *Summary) Done()

Done 删除订阅者

func (*Summary) Point

func (s *Summary) Point() *Summary

func (*Summary) Report

func (s *Summary) Report(slave *Summary)

Report 上报数据

func (*Summary) Running

func (s *Summary) Running() bool

Running 是否正在采集数据

func (*Summary) Start

func (s *Summary) Start()

StartSummary 开始定时采集数据,每秒一次

type TrackPlayer

type TrackPlayer struct {
	context.Context
	context.CancelFunc
	AudioTrack *track.Audio
	VideoTrack *track.Video
	// contains filtered or unexported fields
}

type TrackRemoved

type TrackRemoved struct {
	Track
}

type VideoDeConf

type VideoDeConf DecoderConfiguration[NALUSlice]

func (VideoDeConf) GetAVCC

func (a VideoDeConf) GetAVCC() net.Buffers

func (VideoDeConf) GetFLV

func (a VideoDeConf) GetFLV() net.Buffers

type VideoFrame

type VideoFrame AVFrame[NALUSlice]

func (*VideoFrame) GetAVCC

func (v *VideoFrame) GetAVCC() net.Buffers

func (*VideoFrame) GetFLV

func (v *VideoFrame) GetFLV() net.Buffers

func (*VideoFrame) GetRTP

func (v *VideoFrame) GetRTP() []*RTPFrame

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL