Documentation ¶
Overview ¶
The core package provides public feature for oryx, for example, the config, logger and the utility.
Index ¶
- Constants
- Variables
- func IsNormalQuit(err interface{}) bool
- func Marshal(o Marshaler, b *bytes.Buffer) (err error)
- func Marshals(o ...Marshaler) (data []byte, err error)
- func NewSrsConfCommentReader(r io.Reader) io.Reader
- func NewSrsConfDirective() *srsConfDirective
- func NewSrsConfParser(r io.Reader) *srsConfParser
- func OryxSigContributorsUrl() string
- func OryxSigHandshake() string
- func OryxSigPrimary() string
- func OryxSigRelease() string
- func OryxSigServer() string
- func OryxSigStableBranch() string
- func RandomFill(b []byte)
- func Recover(ctx Context, name string, f func() error)
- func RewriteLogger()
- func Unmarshal(o UnmarshalSizer, b *bytes.Buffer) (err error)
- func Unmarshals(b *bytes.Buffer, o ...UnmarshalSizer) (err error)
- func Version() string
- type Agent
- type Config
- func (v *Config) Conf() string
- func (v *Config) Loads(conf string) error
- func (v *Config) LogTank(level string, dw io.Writer) io.Writer
- func (v *Config) LogToFile() bool
- func (v *Config) Reload(cc *Config) (err error)
- func (v *Config) ReloadCycle(wc WorkerContainer)
- func (c *Config) SetDefaults()
- func (v *Config) Subscribe(h ReloadHandler)
- func (v *Config) Unsubscribe(h ReloadHandler)
- func (v *Config) Validate() error
- func (v *Config) Vhost(name string) (*Vhost, error)
- func (v *Config) VhostGroupMessages(vhost string) (n int, err error)
- func (v *Config) VhostRealtime(vhost string) (r bool, err error)
- type Context
- type Logger
- type Marshaler
- type Message
- type MessageMuxer
- type OpenCloser
- type Opener
- type Play
- type Quiter
- type ReloadHandler
- type UnmarshalSizer
- type Vhost
- type WorkerContainer
Examples ¶
Constants ¶
const ( // global specified. ReloadWorkers = iota ReloadLog ReloadListen ReloadCpuProfile ReloadGcPercent // vhost specified. ReloadMwLatency )
the scope for reload.
const ( RtmpListen = 1935 RtmpDefaultVhost = "__defaultVhost__" RtmpDefaultApp = "__defaultApp__" )
const ( LogInfoLabel = logLabel + "[info] " LogTraceLabel = logLabel + "[trace] " LogWarnLabel = logLabel + "[warn] " LogErrorLabel = logLabel + "[error] " )
const OryxSigAuthors = "winlin"
const OryxSigCode = "MonkeyKing"
const OryxSigCopyright = "Copyright (c) 2013-2015 Oryx(ossrs)"
const OryxSigEmail = "winlin@vip.126.com"
const OryxSigKey = "Oryx"
project info.
const OryxSigLicense = "The MIT License (MIT)"
const OryxSigName = OryxSigKey + "(SRS++)"
const OryxSigProduct = "The go-oryx is SRS++, focus on real-time live streaming cluster."
const OryxSigRole = "cluster"
const OryxSigStable = 0
stable major version
const OryxSigUrl = "https://" + OryxSigUrlShort
const OryxSigUrlShort = "github.com/ossrs/go-oryx"
const OryxSigWeb = "http://ossrs.net"
Variables ¶
var OverflowError error = errors.New("system overflow")
when channel overflow, for example, the c0c1 never overflow when channel buffer size set to 2.
var QuitError error = errors.New("system quit")
the quit error, used for goroutine to return.
var TimeoutError error = errors.New("io timeout")
when io timeout to wait.
var VhostNotFoundError error = errors.New("vhost not found")
when the rtmp vhost not found.
Functions ¶
func IsNormalQuit ¶
func IsNormalQuit(err interface{}) bool
whether the object in recover or returned error can ignore, for instance, the error is a Quit error.
func Marshal ¶
marshal the object o to b
Example ¶
marshal multiple objects to buffer.
package main import ( "bytes" "github.com/ossrs/go-oryx/core" ) func main() { // objects to marshal var x core.Marshaler // for example NewAmf0String("oryx") var y core.Marshaler // for example NewAmf0Number(1.0) var b bytes.Buffer // marshal objects to b if err := core.Marshal(x, &b); err != nil { _ = err // when error. } if err := core.Marshal(y, &b); err != nil { _ = err // when error. } _ = b.Bytes() // use the bytes contains x and y }
Output:
func NewSrsConfDirective ¶
func NewSrsConfDirective() *srsConfDirective
func NewSrsConfParser ¶
func OryxSigContributorsUrl ¶
func OryxSigContributorsUrl() string
func OryxSigHandshake ¶
func OryxSigHandshake() string
func OryxSigPrimary ¶
func OryxSigPrimary() string
func OryxSigRelease ¶
func OryxSigRelease() string
func OryxSigServer ¶
func OryxSigServer() string
func OryxSigStableBranch ¶
func OryxSigStableBranch() string
func RewriteLogger ¶
func RewriteLogger()
rewrite the label and set alias for logger. @remark for normal application, use the ocore directly.
func Unmarshal ¶
func Unmarshal(o UnmarshalSizer, b *bytes.Buffer) (err error)
unmarshal the object from b
Example ¶
unmarshal multiple objects from buffer
package main import ( "bytes" "github.com/ossrs/go-oryx/core" ) func main() { var b bytes.Buffer // read from network. var x core.UnmarshalSizer // for example Amf0String var y core.UnmarshalSizer // for example Amf0Number if err := core.Unmarshal(x, &b); err != nil { _ = err // when error. } if err := core.Unmarshal(y, &b); err != nil { _ = err // when error. } // use x and y. _ = x _ = y }
Output:
func Unmarshals ¶
func Unmarshals(b *bytes.Buffer, o ...UnmarshalSizer) (err error)
unmarshal multiple o pointers, which can be nil.
Types ¶
type Agent ¶
type Agent interface { // an agent is a resource manager. OpenCloser // do agent jobs, to pump messages // from source to sink. Pump() (err error) // write to source, from upstream sink. Write(m Message) (err error) // source tie to the upstream sink. Tie(sink Agent) (err error) // destroy the link between source and upstream sink. UnTie(sink Agent) (err error) // get the tied upstream sink of source. TiedSink() (sink Agent) // sink flow to the downstream source. // @remark internal api, sink.Flow(source) when source.tie(sink). Flow(source Agent) (err error) // destroy the link between sink and downstream sink. UnFlow(source Agent) (err error) }
the agent contains a source which ingest message from upstream sink write message to channel finally delivery to downstream sink.
the arch for agent is:
+-----upstream----+ +---downstream----+ --+-source => sink--+--(tie->)--+-source => sink--+-- +-----------------+ +-----------------+
@remark all method is sync, user should never assume it's async.
type Config ¶
type Config struct { // the global section. Workers int `json:"workers"` // the number of cpus to use // the rtmp global section. Listen int `json:"listen"` // the system service RTMP listen port Daemon bool `json:"daemon"` // whether enabled the daemon for unix-like os ChunkSize int `json:"chunk_size"` // the output chunk size. [128, 65535]. // the go section. Go struct { Writev bool `json:"writev"` // whether use private writev. GcTrace int `json:"gc_trace"` // the gc trace interval in seconds. GcInterval int `json:"gc_interval"` // the gc interval in seconds. GcPercent int `json:"gc_percent"` // the gc percent. CpuProfile string `json:"cpu_profile"` // the cpu profile file. MemProfile string `json:"mem_profile"` // the memory profile file. } // the log config. Log struct { Tank string `json:"tank"` // the log tank, file or console Level string `json:"level"` // the log level, info/trace/warn/error File string `json:"file"` // for log tank file, the log file path. } `json:"log"` // the heartbeat section. Heartbeat struct { Enabled bool `json:"enabled"` // whether enable the heartbeat. Interval float64 `json:"interval"` // the heartbeat interval in seconds. Url string `json:"url"` // the url to report. DeviceId string `json:"device_id"` // the device id to report. Summary bool `json:"summaries"` // whether enable the detail summary. Listen int `json:"listen"` // the heartbeat http api listen port. } `json:"heartbeat"` // the stat section. Stat struct { Network int `json:"network"` // the network device index to use as exported ip. Disks []string `json:"disk"` // the disks to stat. } `json:"stats"` Debug struct { RtmpDumpRecv bool `json:"rtmp_dump_recv"` } `json:"debug"` // the vhosts section. Vhosts []*Vhost `json:"vhosts"` // contains filtered or unexported fields }
the config for this application, which can load from file in json style, and convert to json string. @remark user can use the GsConfig object.
var Conf *Config
the current global config.
func (*Config) Loads ¶
loads and validate config from config file.
Example ¶
package main import ( "fmt" "github.com/ossrs/go-oryx/core" ) func main() { ctx := core.NewContext() c := core.NewConfig(ctx) c.SetDefaults() //if err := c.Loads("config.json"); err != nil { // panic(err) //} fmt.Println("listen at", c.Listen) fmt.Println("workers is", c.Workers) if c.Go.GcInterval == 0 { fmt.Println("go gc use default interval.") } }
Output: listen at 1935 workers is 0 go gc use default interval.
func (*Config) LogTank ¶
get the log tank writer for specified level. the param dw is the default writer.
func (*Config) ReloadCycle ¶
func (v *Config) ReloadCycle(wc WorkerContainer)
func (*Config) SetDefaults ¶
func (c *Config) SetDefaults()
func (*Config) Subscribe ¶
func (v *Config) Subscribe(h ReloadHandler)
subscribe the reload event, when got reload event, notify all handlers.
func (*Config) Unsubscribe ¶
func (v *Config) Unsubscribe(h ReloadHandler)
func (*Config) VhostGroupMessages ¶
type Context ¶
alias the Context interface. @remark user can directly use ocore Context.
func NewContext ¶
func NewContext() Context
type Logger ¶
alias the Logger interface. @remark user can directly use ocore Logger.
the application loggers info, the verbose info level, very detail log, the lowest level, to discard.
trace, the trace level, something important, the default log level, to stdout.
type Message ¶
type Message interface { fmt.Stringer // the muxer of message. Muxer() MessageMuxer }
the message for oryx the common structure for RTMP/FLV/HLS/MP4 or any message, it can be media message or control message. the message flow from agent to another agent.
type MessageMuxer ¶
type MessageMuxer uint8
the muxer of oryx message type.
const ( MuxerRtmp MessageMuxer = iota MuxerFlv MuxerH264 MuxerRtsp MuxerTs MuxerAac MuxerMp3 )
type OpenCloser ¶
the open and closer for resource manage.
type Opener ¶
type Opener interface { // open the resource. Open() error }
the opener to open the resource.
type Play ¶
type Play struct {
MwLatency int `json:"mw_latency`
}
func NewConfPlay ¶
func NewConfPlay() *Play
type Quiter ¶
type Quiter struct {
// contains filtered or unexported fields
}
which used for quit. TODO: FIXME: server should use it.
type ReloadHandler ¶
type ReloadHandler interface { // when reload the global scopes, // for example, the workers, listen and log. // @param scope defined in const ReloadXXX. // @param cc the current loaded config, GsConfig. // @param pc the previous old config. OnReloadGlobal(scope int, cc, pc *Config) (err error) // when reload the vhost scopes, // for example, the Vhost.Play.MwLatency // @param scope defined in const ReloadXXX. // @param cc the current loaded config, GsConfig. // @param pc the previous old config. OnReloadVhost(vhost string, scope int, cc, pc *Config) (err error) }
the reload handler, the client which care about the reload event, must implements this interface and then register itself to the config.
type UnmarshalSizer ¶
type UnmarshalSizer interface { encoding.BinaryUnmarshaler // the total size of bytes for this amf0 instance. Size() int }
unmarshaler and sizer.
type Vhost ¶
type Vhost struct { Name string `json:"name"` Realtime bool `json:"min_latency"` Play *Play `json:"play,ommit-empty"` }
the vhost section in config.
func NewConfVhost ¶
func NewConfVhost() *Vhost
type WorkerContainer ¶
type WorkerContainer interface { // get the quit channel, // worker can fetch the quit signal. // please use Quit to notify the container to quit. QC() <-chan bool // notify the container to quit. // for example, when goroutine fatal error, // which can't be recover, notify server to cleanup and quit. // @remark when got quit signal, the goroutine must notify the // container to Quit(), for which others goroutines wait. // @remark this quit always return a core.QuitError error, which can be ignore. Quit() (err error) // fork a new goroutine with work container. // the param f can be a global func or object method. // the param name is the goroutine name. GFork(name string, f func(WorkerContainer)) }
the container for all worker, which provides the quit and cleanup methods.
Example (Fatal) ¶
the goroutine cycle notify container to quit when error.
package main import ( "github.com/ossrs/go-oryx/core" "time" ) func main() { var wc core.WorkerContainer wc.GFork("myservice", func(wc core.WorkerContainer) { for { select { case <-time.After(3 * time.Second): // select other channel, do something cycle to get error. if err := error(nil); err != nil { // when got none-recoverable error, notify container to quit. wc.Quit() return } case <-wc.QC(): // when got a quit signal, break the loop. // and must notify the container again for other workers // in container to quit. wc.Quit() return } } }) }
Output:
Example (Recoverable) ¶
the goroutine cycle ignore any error.
package main import ( "github.com/ossrs/go-oryx/core" "time" ) func main() { var wc core.WorkerContainer wc.GFork("myservice", func(wc core.WorkerContainer) { for { select { case <-time.After(3 * time.Second): // select other channel, do something cycle to get error. if err := error(nil); err != nil { // recoverable error, log it only and continue or return. continue } case <-wc.QC(): // when got a quit signal, break the loop. // and must notify the container again for other workers // in container to quit. wc.Quit() return } } }) }
Output:
Example (Safe) ¶
the goroutine cycle absolutely safe, no panic no error to quit.
package main import ( "github.com/ossrs/go-oryx/core" "time" ) func main() { var wc core.WorkerContainer wc.GFork("myservice", func(wc core.WorkerContainer) { defer func() { if r := recover(); r != nil { // log the r and ignore. return } }() for { select { case <-time.After(3 * time.Second): // select other channel, do something cycle to get error. if err := error(nil); err != nil { // recoverable error, log it only and continue or return. continue } case <-wc.QC(): // when got a quit signal, break the loop. // and must notify the container again for other workers // in container to quit. wc.Quit() return } } }) }
Output: