README
¶
m7s核心引擎
该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。仅包含最基础的功能,不含任何网络协议部分,但包含了一个插件的引入机制,其他功能均由插件实现
引擎配置
[Engine]
EnableAudio = true
EnableVideo = true
# 发布流默认过期时间单位秒
PublishTimeout = 60
# 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭)
AutoCloseDelay = 10
# 启用RTP包乱序重排
RTPReorder = false
流的状态图
stateDiagram-v2
[*] --> ⌛等待发布者 : 创建
⌛等待发布者 --> 🟢正在发布 :发布
⌛等待发布者 --> 🔴已关闭 :关闭
⌛等待发布者 --> 🔴已关闭 :超时
⌛等待发布者 --> 🔴已关闭 :最后订阅者离开
🟢正在发布 --> ⌛等待发布者: 发布者断开
🟢正在发布 --> 🟡等待关闭: 最后订阅者离开
🟢正在发布 --> 🔴已关闭 :关闭
🟡等待关闭 --> 🟢正在发布 :第一个订阅者进入
🟡等待关闭 --> 🔴已关闭 :关闭
🟡等待关闭 --> 🔴已关闭 :超时
🟡等待关闭 --> 🔴已关闭 :发布者断开
引擎的基本功能
- 引擎初始化会加载配置文件,并逐个调用插件的Run函数
- 具有发布功能的插件,新建一个Stream对象,这个Stream对象随后可以被订阅
- Stream对象中含有两个列表,一个是VideoTracks一个是AudioTracks用来存放视频数据和音频数据
- 每一个VideoTrack或者AudioTrack中包含一个RingBuffer,用来存储发布者提供的数据,同时提供订阅者访问。
- 具有订阅功能的插件,会通过GetStream函数获取到一个流,然后选择VideoTracks、AudioTracks里面的RingBuffer进行连续的读取
发布插件如何发布流
以rtmp协议为例子
stream = &engine.Stream{Type: "RTMP", StreamPath: streamPath}
if stream.Publish() {
absTs := make(map[uint32]uint32)
vt := stream.NewVideoTrack(0)
at := stream.NewAudioTrack(0)
rec_audio = func(msg *Chunk) {
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
}
if msg.Timestamp == 0xffffff {
absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
at.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
}
rec_video = func(msg *Chunk) {
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
}
if msg.Timestamp == 0xffffff {
absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
vt.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
}
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
} else {
err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
}
默认会创建一个VideoTrack和一个AudioTrack 当我们接收到数据的时候就可以朝里面填充物数据了
在填充数据之前,需要获取到SPS和PPS,然后设置好,因为订阅者需要先发送这个数据 然后通过Track到Push函数将数据填充到RingBuffer里面去
订阅插件如何订阅流
sub := Subscriber{ID: r.RemoteAddr, Type: "FLV", Ctx2: r.Context()}
if err := sub.Subscribe(stringPath); err == nil {
vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack()
var buffer bytes.Buffer
if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil {
return
}
if vt != nil {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, 0, vt.ExtraData.Payload)
sub.OnVideo = func(ts uint32, pack *VideoPack) {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, ts, pack.Payload)
}
}
if at != nil {
if at.CodecID == 10 {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, 0, at.ExtraData)
}
sub.OnAudio = func(ts uint32, pack *AudioPack) {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, ts, pack.Payload)
}
}
sub.Play(at, vt)
}
- 在发送数据前,需要先发送音视频的序列帧
Documentation
¶
Index ¶
- Variables
- func Run(ctx context.Context, configFile string) (err error)
- type AudioDeConf
- type AudioFrame
- type Client
- type ClientConfig
- type FirstConfig
- type GlobalConfig
- func (config *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)
- func (config *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)
- func (config *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)
- func (config *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)
- type HaveAVCC
- type HaveFLV
- type IIO
- type IO
- type IOConfig
- type IPublisher
- type IPuller
- type IPusher
- type ISubscriber
- type NetWorkInfo
- type Plugin
- func (opt *Plugin) Publish(streamPath string, pub IPublisher) error
- func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save bool) (err error)
- func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error)
- func (opt *Plugin) Save() error
- func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error
- func (opt *Plugin) Update(conf config.Config)
- type Publisher
- type Puller
- type Pusher
- type SEKick
- type SEclose
- type SEpublish
- type SEwaitClose
- type SEwaitPublish
- type StateEvent
- type Stream
- func (s *Stream) AddTrack(t Track)
- func (s *Stream) Close()
- func (r *Stream) IsClosed() bool
- func (r *Stream) NewAudioTrack() (at *track.UnknowAudio)
- func (r *Stream) NewDataTrack(locker sync.Locker) (dt *track.Data)
- func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo)
- func (s *Stream) Receive(event any) bool
- func (s *Stream) RemoveTrack(t Track)
- func (s *Stream) SSRC() uint32
- type StreamAction
- type StreamState
- type StreamTimeoutConfig
- type Subscriber
- type Summary
- type TrackPlayer
- type TrackRemoved
- type VideoDeConf
- type VideoFrame
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ExecPath = os.Args[0] ExecDir = filepath.Dir(ExecPath) // ConfigRaw 配置信息的原始数据 ConfigRaw []byte StartTime time.Time //启动时间 Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置 EngineConfig = &GlobalConfig{ Engine: config.Global, } Engine = InstallPlugin(EngineConfig) //复用安装插件逻辑,将全局配置信息注入,并启动server MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置 EventBus = make(chan any, 10) )
View Source
var ActionNames = [...]string{"publish", "timeout", "publish lost", "close", "last leave", "first enter", "no tracks"}
View Source
var BadNameErr = errors.New("Bad Name")
View Source
var NoPullConfigErr = errors.New("no pull config")
View Source
var StateNames = [...]string{"⌛", "🟢", "🟡", "🔴"}
View Source
var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{ { ACTION_PUBLISH: STATE_PUBLISHING, ACTION_TIMEOUT: STATE_CLOSED, ACTION_LASTLEAVE: STATE_CLOSED, ACTION_CLOSE: STATE_CLOSED, }, { ACTION_PUBLISHLOST: STATE_WAITPUBLISH, ACTION_LASTLEAVE: STATE_WAITCLOSE, ACTION_CLOSE: STATE_CLOSED, }, { ACTION_PUBLISHLOST: STATE_CLOSED, ACTION_TIMEOUT: STATE_CLOSED, ACTION_FIRSTENTER: STATE_PUBLISHING, ACTION_CLOSE: STATE_CLOSED, }, {}, }
View Source
var StreamIsClosedErr = errors.New("Stream Is Closed")
Streams 所有的流集合
Functions ¶
Types ¶
type AudioDeConf ¶
type AudioDeConf DecoderConfiguration[AudioSlice]
func (AudioDeConf) GetAVCC ¶
func (a AudioDeConf) GetAVCC() net.Buffers
func (AudioDeConf) GetFLV ¶
func (a AudioDeConf) GetFLV() net.Buffers
type AudioFrame ¶
type AudioFrame AVFrame[AudioSlice]
func (*AudioFrame) GetAVCC ¶
func (a *AudioFrame) GetAVCC() net.Buffers
func (*AudioFrame) GetFLV ¶
func (a *AudioFrame) GetFLV() net.Buffers
func (*AudioFrame) GetRTP ¶
func (a *AudioFrame) GetRTP() []*RTPFrame
type Client ¶
type Client[C ClientConfig] struct { Config *C StreamPath string // 本地流标识 RemoteURL string // 远程服务器地址(用于推拉) ReConnectCount int //重连次数 }
type FirstConfig ¶
type GlobalConfig ¶
func (*GlobalConfig) API_closeStream ¶
func (config *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_summary ¶
func (config *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request)
func (*GlobalConfig) API_sysInfo ¶
func (config *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request)
func (*GlobalConfig) ServeHTTP ¶
func (config *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request)
type IO ¶
type IO[C IOConfig, S IIO] struct { ID string Type string context.Context //不要直接设置,应当通过OnEvent传入父级Context context.CancelFunc //流关闭是关闭发布者或者订阅者 *zap.Logger StartTime time.Time //创建时间 Stream *Stream `json:"-"` io.Reader `json:"-"` io.Writer `json:"-"` io.Closer `json:"-"` Args url.Values Config *C }
func (*IO[C, S]) SetParentCtx ¶
SetParentCtx(可选)
type IPublisher ¶
type IPuller ¶
type IPuller interface { IPublisher Connect() error Pull() Reconnect() bool // contains filtered or unexported methods }
type IPusher ¶
type IPusher interface { ISubscriber Push() Connect() error Reconnect() bool // contains filtered or unexported methods }
type ISubscriber ¶
type ISubscriber interface { IIO GetConfig() *config.Subscribe IsPlaying() bool Play(ISubscriber) func() error PlayBlock(ISubscriber) Stop() // contains filtered or unexported methods }
type NetWorkInfo ¶
type NetWorkInfo struct { Name string Receive uint64 Sent uint64 ReceiveSpeed uint64 SentSpeed uint64 }
NetWorkInfo 网速信息
type Plugin ¶
type Plugin struct { context.Context `json:"-"` context.CancelFunc `json:"-"` Name string //插件名称 Config config.Plugin //插件配置 Version string //插件版本 RawConfig config.Config //配置的map形式方便查询 Modified config.Config //修改过的配置项 *zap.Logger }
Plugin 插件信息
func InstallPlugin ¶
InstallPlugin 安装插件,传入插件配置生成插件信息对象
type Publisher ¶
type Publisher struct { IO[config.Publish, IPublisher] common.AudioTrack common.VideoTrack }
type SEclose ¶
type SEclose struct {
StateEvent
}
type SEpublish ¶
type SEpublish struct {
StateEvent
}
type SEwaitClose ¶
type SEwaitClose struct {
StateEvent
}
type SEwaitPublish ¶
type SEwaitPublish struct { StateEvent Publisher IPublisher }
type StateEvent ¶
type StateEvent struct { Action StreamAction From StreamState Stream *Stream }
func (StateEvent) Next ¶
func (se StateEvent) Next() (next StreamState, ok bool)
type Stream ¶
type Stream struct { *zap.Logger StartTime time.Time //创建时间 StreamTimeoutConfig Path string Publisher IPublisher State StreamState Subscribers []ISubscriber // 订阅者 Tracks map[string]Track AppName string StreamName string // contains filtered or unexported fields }
Stream 流定义
func FilterStreams ¶
func FilterStreams[T IPublisher]() (ss []*Stream)
func (*Stream) NewAudioTrack ¶
func (r *Stream) NewAudioTrack() (at *track.UnknowAudio)
func (*Stream) NewVideoTrack ¶
func (r *Stream) NewVideoTrack() (vt *track.UnknowVideo)
如果暂时不知道编码格式可以用这个
func (*Stream) RemoveTrack ¶
func (s *Stream) RemoveTrack(t Track)
type StreamAction ¶
type StreamAction byte
const ( ACTION_PUBLISH StreamAction = iota ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到 ACTION_PUBLISHLOST // 发布者意外断开 ACTION_CLOSE // 主动关闭流 ACTION_LASTLEAVE // 最后一个订阅者离开 ACTION_FIRSTENTER // 第一个订阅者进入 )
type StreamState ¶
type StreamState byte
const ( STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 STATE_PUBLISHING // 正在发布流状态 STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启) STATE_CLOSED // 流已关闭,不可使用 )
四状态机
type StreamTimeoutConfig ¶
type Subscriber ¶
type Subscriber struct { IO[config.Subscribe, ISubscriber] TrackPlayer }
Subscriber 订阅者实体定义
func (*Subscriber) AddTrack ¶
func (s *Subscriber) AddTrack(t Track) bool
func (*Subscriber) IsPlaying ¶
func (s *Subscriber) IsPlaying() bool
func (*Subscriber) OnEvent ¶
func (s *Subscriber) OnEvent(event any)
func (*Subscriber) Play ¶
func (s *Subscriber) Play(spesic ISubscriber) func() error
非阻塞式读取,通过反复调用返回的函数可以尝试读取数据,读取到数据后会调用OnEvent,这种模式自由的在不同的goroutine中调用
func (*Subscriber) Stop ¶
func (s *Subscriber) Stop()
type Summary ¶
type Summary struct { Address string Memory struct { Total uint64 Free uint64 Used uint64 Usage float64 } CPUUsage float64 HardDisk struct { Total uint64 Free uint64 Used uint64 Usage float64 } NetWork []NetWorkInfo Streams []*Stream // contains filtered or unexported fields }
ServerSummary 系统摘要定义
type TrackPlayer ¶
type TrackRemoved ¶
type TrackRemoved struct {
Track
}
type VideoDeConf ¶
type VideoDeConf DecoderConfiguration[NALUSlice]
func (VideoDeConf) GetAVCC ¶
func (a VideoDeConf) GetAVCC() net.Buffers
func (VideoDeConf) GetFLV ¶
func (a VideoDeConf) GetFLV() net.Buffers
type VideoFrame ¶
type VideoFrame AVFrame[NALUSlice]
func (*VideoFrame) GetAVCC ¶
func (v *VideoFrame) GetAVCC() net.Buffers
func (*VideoFrame) GetFLV ¶
func (v *VideoFrame) GetFLV() net.Buffers
func (*VideoFrame) GetRTP ¶
func (v *VideoFrame) GetRTP() []*RTPFrame
Source Files
¶
Click to show internal directories.
Click to hide internal directories.