engine

package module
v2.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2021 License: AGPL-3.0 Imports: 25 Imported by: 19

README

Monibuca核心引擎

该项目为Monibuca的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。仅包含最基础的功能,不含任何网络协议部分,但包含了一个插件的引入机制,其他功能均由插件实现

示意图

Documentation

Index

Constants

View Source
const (
	PLUGIN_NONE       = 0      //独立插件
	PLUGIN_SUBSCRIBER = 1      //订阅者插件
	PLUGIN_PUBLISHER  = 1 << 1 //发布者插件
	PLUGIN_HOOK       = 1 << 2 //钩子插件
	PLUGIN_APP        = 1 << 3 //应用插件
)

Variables

View Source
var (

	// ConfigRaw 配置信息的原始数据
	ConfigRaw []byte
	// Version 引擎版本号
	Version string
	// EngineInfo 引擎信息
	EngineInfo = &struct {
		Version          *string
		StartTime        time.Time //启动时间
		EnableWaitStream *bool
		RingSize         *int
	}{&Version, time.Now(), &config.EnableWaitStream, &config.RingSize}
)
View Source
var AuthHooks = make(AuthHook, 0)
View Source
var OnDropHooks = make(OnDropHook, 0)
View Source
var OnPublishHooks = make(OnPublishHook, 0)
View Source
var OnStreamClosedHooks = make(OnStreamClosedHook, 0)
View Source
var OnSubscribeHooks = make(OnSubscribeHook, 0)
View Source
var OnSummaryHooks = make(OnSummaryHook, 0)
View Source
var OnUnSubscribeHooks = make(OnUnSubscribeHook, 0)
View Source
var Plugins = make(map[string]*PluginConfig)

Plugins 所有的插件配置

View Source
var Summary = ServerSummary{}

Summary 系统摘要数据

Functions

func AddWriter

func AddWriter(wn io.Writer)

AddWriter 添加日志输出端

func InstallPlugin

func InstallPlugin(opt *PluginConfig)

InstallPlugin 安装插件

func MayBeError

func MayBeError(info error) (hasError bool)

MayBeError 优雅错误判断加日志辅助函数

func Print

func Print(v ...interface{})

Print 带颜色识别

func Printf

func Printf(format string, v ...interface{})

Printf calls Output to print to the standard logger. Arguments are handled in the manner of fmt.Printf.

func Println

func Println(v ...interface{})

Println calls Output to print to the standard logger. Arguments are handled in the manner of fmt.Println.

func Run

func Run(configFile string) (err error)

Run 启动Monibuca引擎

Types

type AuthHook

type AuthHook []func(string) error

func (AuthHook) AddHook

func (h AuthHook) AddHook(hook func(string) error)

func (AuthHook) Trigger

func (h AuthHook) Trigger(sign string) error

type ChangeStreamCmd

type ChangeStreamCmd struct {
	*Subscriber
	NewStream *Stream
}

ChangeStreamCmd 切换流命令

type Collection

type Collection struct {
	sync.Map
}

Collection 对sync.Map的包装

type ListenerConfig

type ListenerConfig struct {
	ListenAddr string
}

ListenerConfig 带有监听地址端口的插件配置类型

type LogWriter

type LogWriter struct {
	*MultiLogWriter
}

LogWriter 多端写日志类

func (*LogWriter) Write

func (w *LogWriter) Write(data []byte) (n int, err error)

type MultiLogWriter

type MultiLogWriter struct {
	sync.Map
}

func (*MultiLogWriter) Write

func (w *MultiLogWriter) Write(data []byte) (n int, err error)

type NALU added in v2.1.0

type NALU struct {
	Publisher
	// contains filtered or unexported fields
}

func (*NALU) WriteNALU added in v2.1.0

func (r *NALU) WriteNALU(ts uint32, payload []byte)

type NetWorkInfo

type NetWorkInfo struct {
	Name         string
	Receive      uint64
	Sent         uint64
	ReceiveSpeed uint64
	SentSpeed    uint64
}

NetWorkInfo 网速信息

type OnDropHook

type OnDropHook []func(s *Subscriber)

func (OnDropHook) AddHook

func (h OnDropHook) AddHook(hook func(s *Subscriber))

func (OnDropHook) Trigger

func (h OnDropHook) Trigger(s *Subscriber)

type OnPublishHook

type OnPublishHook []func(r *Stream)

func (OnPublishHook) AddHook

func (h OnPublishHook) AddHook(hook func(r *Stream))

func (OnPublishHook) Trigger

func (h OnPublishHook) Trigger(r *Stream)

type OnStreamClosedHook

type OnStreamClosedHook []func(*Stream)

func (OnStreamClosedHook) AddHook

func (h OnStreamClosedHook) AddHook(hook func(*Stream))

func (OnStreamClosedHook) Trigger

func (h OnStreamClosedHook) Trigger(v *Stream)

type OnSubscribeHook

type OnSubscribeHook []func(s *Subscriber)

func (OnSubscribeHook) AddHook

func (h OnSubscribeHook) AddHook(hook func(s *Subscriber))

func (OnSubscribeHook) Trigger

func (h OnSubscribeHook) Trigger(s *Subscriber)

type OnSummaryHook

type OnSummaryHook []func(bool)

func (OnSummaryHook) AddHook

func (h OnSummaryHook) AddHook(hook func(bool))

func (OnSummaryHook) Trigger

func (h OnSummaryHook) Trigger(v bool)

type OnUnSubscribeHook

type OnUnSubscribeHook []func(s *Subscriber)

func (OnUnSubscribeHook) AddHook

func (h OnUnSubscribeHook) AddHook(hook func(s *Subscriber))

func (OnUnSubscribeHook) Trigger

func (h OnUnSubscribeHook) Trigger(s *Subscriber)

type PluginConfig

type PluginConfig struct {
	Name      string                       //插件名称
	Type      byte                         //类型
	Config    interface{}                  //插件配置
	UIFile    *embed.FS                    //界面目录
	Version   string                       //插件版本
	Dir       string                       //插件代码路径
	Run       func()                       //插件启动函数
	HotConfig map[string]func(interface{}) //热修改配置
}

PluginConfig 插件配置定义

type Publisher

type Publisher struct {
	context.Context

	AutoUnPublish bool //	当无人订阅时自动停止发布
	*Stream
	// contains filtered or unexported fields
}

Publisher 发布者实体定义

func (*Publisher) Close

func (p *Publisher) Close()

Close 关闭发布者

func (*Publisher) Publish

func (p *Publisher) Publish(streamPath string) bool

Publish 发布者进行发布操作

func (*Publisher) Running

func (p *Publisher) Running() bool

Running 发布者是否正在发布

type Ring

type Ring struct {
	*RingItem

	Size  int
	Index int
	Flag  int32
	// contains filtered or unexported fields
}

Ring 环形缓冲,使用数组实现

func NewRing

func NewRing(exp int) (r *Ring)

NewRing 创建Ring,传入大小指数

func (Ring) Clone

func (r Ring) Clone() *Ring

Clone 克隆一个Ring

func (*Ring) Dispose added in v2.4.2

func (r *Ring) Dispose()

Clone 克隆一个Ring

func (*Ring) GetAt

func (r *Ring) GetAt(index int) *RingItem

GetAt 获取指定索引处的引用

func (*Ring) GetBuffer added in v2.0.1

func (r *Ring) GetBuffer() *bytes.Buffer

func (*Ring) GetLast

func (r *Ring) GetLast() *RingItem

GetLast 获取上一个位置的引用

func (*Ring) GetNext

func (r *Ring) GetNext() *RingItem

GetNext 获取下一个位置的引用

func (*Ring) GoBack

func (r *Ring) GoBack()

GoBack 移动到上一个位置

func (*Ring) GoNext

func (r *Ring) GoNext()

GoNext 移动到下一个位置

func (*Ring) GoTo

func (r *Ring) GoTo(index int)

GoTo 移动到指定索引处

func (*Ring) NextR

func (r *Ring) NextR()

NextR 读下一个

func (*Ring) NextW

func (r *Ring) NextW()

NextW 写下一个

func (*Ring) Timeout added in v2.1.9

func (r *Ring) Timeout() bool

Timeout 发布者是否超时了

type RingItem

type RingItem struct {
	avformat.AVPacket
	sync.WaitGroup
	*bytes.Buffer
	UpdateTime time.Time
}

type ServerSummary

type ServerSummary struct {
	Address string
	Memory  struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	CPUUsage float64
	HardDisk struct {
		Total uint64
		Free  uint64
		Used  uint64
		Usage float64
	}
	NetWork []NetWorkInfo
	Streams []*StreamInfo

	Children map[string]*ServerSummary
	// contains filtered or unexported fields
}

ServerSummary 系统摘要定义

func (*ServerSummary) Add

func (s *ServerSummary) Add()

Add 增加订阅者

func (*ServerSummary) Done

func (s *ServerSummary) Done()

Done 删除订阅者

func (*ServerSummary) Report

func (s *ServerSummary) Report(slave *ServerSummary)

Report 上报数据

func (*ServerSummary) Running

func (s *ServerSummary) Running() bool

Running 是否正在采集数据

func (*ServerSummary) StartSummary

func (s *ServerSummary) StartSummary()

StartSummary 开始定时采集数据,每秒一次

type Stream

type Stream struct {
	context.Context
	*Publisher
	StreamInfo   //可序列化,供后台查看的数据
	Control      chan interface{}
	Cancel       context.CancelFunc
	Subscribers  map[string]*Subscriber // 订阅者
	VideoTag     *AVPacket              // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
	AudioTag     *AVPacket              // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
	FirstScreen  *Ring                  //最近的关键帧位置,首屏渲染
	AVRing       *Ring                  //数据环
	WaitPub      chan struct{}          //用于订阅和等待发布者
	UseTimestamp bool                   //是否采用数据包中的时间戳
	SPS          []byte
	PPS          []byte
}

Stream 流定义

func FindStream

func FindStream(streamPath string) *Stream

FindStream 根据流路径查找流

func GetStream

func GetStream(streamPath string) (result *Stream)

GetStream 根据流路径获取流,如果不存在则创建一个新的

func (*Stream) GetBuffer added in v2.0.1

func (r *Stream) GetBuffer() *bytes.Buffer

GetBuffer 获取用于写入的缓冲区

func (*Stream) PushAudio

func (r *Stream) PushAudio(timestamp uint32, payload []byte)

PushAudio 来自发布者推送的音频

func (*Stream) PushVideo

func (r *Stream) PushVideo(timestamp uint32, payload []byte)

PushVideo 来自发布者推送的视频

func (*Stream) Run

func (r *Stream) Run()

Run 流运行

func (*Stream) Subscribe

func (r *Stream) Subscribe(s *Subscriber)

Subscribe 订阅流

func (*Stream) UnSubscribe

func (r *Stream) UnSubscribe(s *Subscriber)

UnSubscribe 取消订阅流

func (*Stream) WriteASC added in v2.1.0

func (r *Stream) WriteASC(asc []byte)

func (*Stream) WritePPS added in v2.1.0

func (r *Stream) WritePPS(pps []byte)

func (*Stream) WriteSPS added in v2.1.0

func (r *Stream) WriteSPS(sps []byte)

type StreamInfo

type StreamInfo struct {
	StreamPath     string
	StartTime      time.Time
	SubscriberInfo []*SubscriberInfo
	Type           string
	VideoInfo      struct {
		PacketCount int
		CodecID     byte
		SPSInfo     SPSInfo
		BPS         int

		GOP int //关键帧间隔
		// contains filtered or unexported fields
	}
	AudioInfo struct {
		PacketCount int
		SoundFormat byte //4bit
		SoundRate   int  //2bit
		SoundSize   byte //1bit
		SoundType   byte //1bit

		BPS int
		// contains filtered or unexported fields
	}
	HasAudio    bool
	HasVideo    bool
	EnableVideo *bool
	EnableAudio *bool
}

StreamInfo 流可序列化信息,用于控制台显示

type SubscribeCmd

type SubscribeCmd struct {
	*Subscriber
}

SubscribeCmd 订阅流命令

type Subscriber

type Subscriber struct {
	context.Context
	*Stream
	SubscriberInfo
	MetaData   func(stream *Stream) error
	OnData     func(*avformat.SendPacket) error
	Cancel     context.CancelFunc
	Sign       string
	OffsetTime uint32

	avformat.SendPacket
	// contains filtered or unexported fields
}

Subscriber 订阅者实体定义

func (*Subscriber) Close

func (s *Subscriber) Close()

Close 关闭订阅者

func (*Subscriber) IsClosed

func (s *Subscriber) IsClosed() bool

IsClosed 检查订阅者是否已经关闭

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(streamPath string) (err error)

Subscribe 开始订阅

func (*Subscriber) SubscribeWithContext added in v2.3.0

func (s *Subscriber) SubscribeWithContext(streamPath string, ctx context.Context) (err error)

SubscribeWithContext 带额外取消功能的订阅

type SubscriberInfo

type SubscriberInfo struct {
	ID            string
	TotalDrop     int //总丢帧
	TotalPacket   int
	Type          string
	BufferLength  int
	Delay         uint32
	SubscribeTime time.Time
}

SubscriberInfo 订阅者可序列化信息,用于控制台输出

type UnSubscribeCmd

type UnSubscribeCmd struct {
	*Subscriber
}

UnSubscribeCmd 取消订阅命令

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL