Documentation
¶
Overview ¶
Mostly take from github.com/Workiva/go-datastructures.
Index ¶
- Constants
- Variables
- func ExecuteInParallel(q *Queue, fn func(interface{}))
- type AppData
- type BasicHLSVideoManifest
- func (m *BasicHLSVideoManifest) AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error
- func (m *BasicHLSVideoManifest) DeleteVideoStream(strmID string) error
- func (m *BasicHLSVideoManifest) GetManifest() (*m3u8.MasterPlaylist, error)
- func (m *BasicHLSVideoManifest) GetManifestID() string
- func (m *BasicHLSVideoManifest) GetStreamVariant(strmID string) (*m3u8.Variant, error)
- func (m *BasicHLSVideoManifest) GetVideoFormat() VideoFormat
- func (m *BasicHLSVideoManifest) GetVideoStream(strmID string) (HLSVideoStream, error)
- func (m *BasicHLSVideoManifest) GetVideoStreams() []HLSVideoStream
- func (m BasicHLSVideoManifest) String() string
- type BasicHLSVideoStream
- func (s *BasicHLSVideoStream) AddHLSSegment(seg *HLSSegment) error
- func (s *BasicHLSVideoStream) AppData() AppData
- func (s *BasicHLSVideoStream) End()
- func (s *BasicHLSVideoStream) GetHLSSegment(segName string) (*HLSSegment, error)
- func (s *BasicHLSVideoStream) GetStreamFormat() VideoFormat
- func (s *BasicHLSVideoStream) GetStreamID() string
- func (s *BasicHLSVideoStream) GetStreamPlaylist() (*m3u8.MediaPlaylist, error)
- func (s *BasicHLSVideoStream) SetSubscriber(f func(seg *HLSSegment, eof bool))
- func (s BasicHLSVideoStream) String() string
- type BasicRTMPVideoStream
- func (s *BasicRTMPVideoStream) AppData() AppData
- func (s *BasicRTMPVideoStream) Close()
- func (s *BasicRTMPVideoStream) GetStreamFormat() VideoFormat
- func (s *BasicRTMPVideoStream) GetStreamID() string
- func (s BasicRTMPVideoStream) Height() int
- func (s *BasicRTMPVideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) (eof chan struct{}, err error)
- func (s BasicRTMPVideoStream) String() string
- func (s BasicRTMPVideoStream) Width() int
- func (s *BasicRTMPVideoStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) (eof chan struct{}, err error)
- type Broadcaster
- type HLSDemuxer
- type HLSMuxer
- type HLSSegment
- type HLSVideoManifest
- type HLSVideoStream
- type Queue
- func (q *Queue) Dispose() []interface{}
- func (q *Queue) Disposed() bool
- func (q *Queue) Empty() bool
- func (q *Queue) Get(number int64) ([]interface{}, error)
- func (q *Queue) Len() int64
- func (q *Queue) Peek() (interface{}, error)
- func (q *Queue) Poll(ctx context.Context, number int64, timeout time.Duration) ([]interface{}, error)
- func (q *Queue) Put(items ...interface{}) error
- func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error)
- type RTMPEOF
- type RTMPVideoStream
- type Subscriber
- type VideoFormat
- type VideoManifest
- type VideoStream
Constants ¶
const DefaultHLSStreamCap = uint(500)
const DefaultHLSStreamWin = uint(3)
const DefaultSegWaitTime = time.Second * 10
const DefaultMediaWinLen = uint(5)
const SegWaitInterval = time.Second
Variables ¶
var ( // ErrDisposed is returned when an operation is performed on a disposed // queue. ErrDisposed = errors.New(`queue: disposed`) // ErrTimeout is returned when an applicable queue operation times out. ErrTimeout = errors.New(`queue: poll timed out`) // ErrEmptyQueue is returned when an non-applicable queue operation was called // due to the queue's empty item state ErrEmptyQueue = errors.New(`queue: empty queue`) )
var ( HLS = MakeVideoFormatType(avFormatTypeMagic + 1) RTMP = MakeVideoFormatType(avFormatTypeMagic + 2) DASH = MakeVideoFormatType(avFormatTypeMagic + 3) )
var ErrAddHLSSegment = errors.New("ErrAddHLSSegment")
var ErrBadHLSBuffer = errors.New("BadHLSBuffer")
var ErrBufferEmpty = errors.New("Stream Buffer Empty")
var ErrBufferFull = errors.New("Stream Buffer Full")
var ErrBufferItemType = errors.New("Buffer Item Type Not Recognized")
var ErrDroppedRTMPStream = errors.New("RTMP Stream Stopped Without EOF")
var ErrEOF = errors.New("ErrEOF")
var ErrHttpReqFailed = errors.New("Http Request Failed")
var ErrNotFound = errors.New("Not Found")
var ErrVideoManifest = errors.New("ErrVideoManifest")
Functions ¶
func ExecuteInParallel ¶
func ExecuteInParallel(q *Queue, fn func(interface{}))
ExecuteInParallel will (in parallel) call the provided function with each item in the queue until the queue is exhausted. When the queue is exhausted execution is complete and all goroutines will be killed. This means that the queue will be disposed so cannot be used again.
Types ¶
type BasicHLSVideoManifest ¶
type BasicHLSVideoManifest struct {
// contains filtered or unexported fields
}
func NewBasicHLSVideoManifest ¶
func NewBasicHLSVideoManifest(id string) *BasicHLSVideoManifest
func (*BasicHLSVideoManifest) AddVideoStream ¶
func (m *BasicHLSVideoManifest) AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error
func (*BasicHLSVideoManifest) DeleteVideoStream ¶
func (m *BasicHLSVideoManifest) DeleteVideoStream(strmID string) error
func (*BasicHLSVideoManifest) GetManifest ¶
func (m *BasicHLSVideoManifest) GetManifest() (*m3u8.MasterPlaylist, error)
func (*BasicHLSVideoManifest) GetManifestID ¶
func (m *BasicHLSVideoManifest) GetManifestID() string
func (*BasicHLSVideoManifest) GetStreamVariant ¶
func (m *BasicHLSVideoManifest) GetStreamVariant(strmID string) (*m3u8.Variant, error)
func (*BasicHLSVideoManifest) GetVideoFormat ¶
func (m *BasicHLSVideoManifest) GetVideoFormat() VideoFormat
func (*BasicHLSVideoManifest) GetVideoStream ¶
func (m *BasicHLSVideoManifest) GetVideoStream(strmID string) (HLSVideoStream, error)
func (*BasicHLSVideoManifest) GetVideoStreams ¶
func (m *BasicHLSVideoManifest) GetVideoStreams() []HLSVideoStream
func (BasicHLSVideoManifest) String ¶
func (m BasicHLSVideoManifest) String() string
type BasicHLSVideoStream ¶
type BasicHLSVideoStream struct {
// contains filtered or unexported fields
}
BasicHLSVideoStream is a basic implementation of HLSVideoStream
func NewBasicHLSVideoStream ¶
func NewBasicHLSVideoStream(strmID string, wSize uint) *BasicHLSVideoStream
func (*BasicHLSVideoStream) AddHLSSegment ¶
func (s *BasicHLSVideoStream) AddHLSSegment(seg *HLSSegment) error
AddHLSSegment adds the hls segment to the right stream
func (*BasicHLSVideoStream) AppData ¶
func (s *BasicHLSVideoStream) AppData() AppData
func (*BasicHLSVideoStream) End ¶
func (s *BasicHLSVideoStream) End()
func (*BasicHLSVideoStream) GetHLSSegment ¶
func (s *BasicHLSVideoStream) GetHLSSegment(segName string) (*HLSSegment, error)
GetHLSSegment gets the HLS segment. It blocks until something is found, or timeout happens.
func (*BasicHLSVideoStream) GetStreamFormat ¶
func (s *BasicHLSVideoStream) GetStreamFormat() VideoFormat
GetStreamFormat always returns HLS
func (*BasicHLSVideoStream) GetStreamID ¶
func (s *BasicHLSVideoStream) GetStreamID() string
GetStreamID returns the streamID
func (*BasicHLSVideoStream) GetStreamPlaylist ¶
func (s *BasicHLSVideoStream) GetStreamPlaylist() (*m3u8.MediaPlaylist, error)
GetStreamPlaylist returns the media playlist represented by the streamID
func (*BasicHLSVideoStream) SetSubscriber ¶
func (s *BasicHLSVideoStream) SetSubscriber(f func(seg *HLSSegment, eof bool))
SetSubscriber sets the callback function that will be called when a new hls segment is inserted
func (BasicHLSVideoStream) String ¶
func (s BasicHLSVideoStream) String() string
type BasicRTMPVideoStream ¶
type BasicRTMPVideoStream struct { EOF chan struct{} RTMPTimeout time.Duration // contains filtered or unexported fields }
func NewBasicRTMPVideoStream ¶
func NewBasicRTMPVideoStream(data AppData) *BasicRTMPVideoStream
NewBasicRTMPVideoStream creates a new BasicRTMPVideoStream. The default RTMPTimeout is set to 10 milliseconds because we assume all RTMP streams are local.
func (*BasicRTMPVideoStream) AppData ¶
func (s *BasicRTMPVideoStream) AppData() AppData
func (*BasicRTMPVideoStream) Close ¶
func (s *BasicRTMPVideoStream) Close()
func (*BasicRTMPVideoStream) GetStreamFormat ¶
func (s *BasicRTMPVideoStream) GetStreamFormat() VideoFormat
func (*BasicRTMPVideoStream) GetStreamID ¶
func (s *BasicRTMPVideoStream) GetStreamID() string
func (BasicRTMPVideoStream) Height ¶
func (s BasicRTMPVideoStream) Height() int
func (*BasicRTMPVideoStream) ReadRTMPFromStream ¶
func (s *BasicRTMPVideoStream) ReadRTMPFromStream(ctx context.Context, dst av.MuxCloser) (eof chan struct{}, err error)
ReadRTMPFromStream reads the content from the RTMP stream out into the dst.
func (BasicRTMPVideoStream) String ¶
func (s BasicRTMPVideoStream) String() string
func (BasicRTMPVideoStream) Width ¶
func (s BasicRTMPVideoStream) Width() int
func (*BasicRTMPVideoStream) WriteRTMPToStream ¶
func (s *BasicRTMPVideoStream) WriteRTMPToStream(ctx context.Context, src av.DemuxCloser) (eof chan struct{}, err error)
WriteRTMPToStream writes a video stream from src into the stream.
type Broadcaster ¶
type Broadcaster interface { Broadcast(seqNo uint64, data []byte) error IsLive() bool Finish() error String() string }
Broadcaster takes a streamID and a reader, and broadcasts the data to whatever underlining network. Note the data param doesn't have to be the raw data. The implementation can choose to encode any struct. Example:
s := GetStream("StrmID") b := ppspp.NewBroadcaster("StrmID", s.Metadata()) for seqNo, data := range s.Segments() { b.Broadcast(seqNo, data) } b.Finish()
type HLSDemuxer ¶
type HLSSegment ¶
We couldn't just use the m3u8 definition
type HLSVideoManifest ¶
type HLSVideoManifest interface { VideoManifest GetManifest() (*m3u8.MasterPlaylist, error) GetVideoStream(strmID string) (HLSVideoStream, error) // AddVideoStream(strmID string, variant *m3u8.Variant) (HLSVideoStream, error) AddVideoStream(strm HLSVideoStream, variant *m3u8.Variant) error GetStreamVariant(strmID string) (*m3u8.Variant, error) GetVideoStreams() []HLSVideoStream DeleteVideoStream(strmID string) error }
type HLSVideoStream ¶
type HLSVideoStream interface { VideoStream GetStreamPlaylist() (*m3u8.MediaPlaylist, error) // GetStreamVariant() *m3u8.Variant GetHLSSegment(segName string) (*HLSSegment, error) AddHLSSegment(seg *HLSSegment) error SetSubscriber(f func(seg *HLSSegment, eof bool)) End() }
HLSVideoStream contains the master playlist, media playlists in it, and the segments in them. Each media playlist also has a streamID. You can only add media playlists to the stream.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is the struct responsible for tracking the state of the queue.
func (*Queue) Dispose ¶
func (q *Queue) Dispose() []interface{}
Dispose will dispose of this queue and returns the items disposed. Any subsequent calls to Get or Put will return an error.
func (*Queue) Disposed ¶
Disposed returns a bool indicating if this queue has had disposed called on it.
func (*Queue) Get ¶
Get retrieves items from the queue. If there are some items in the queue, get will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue.
func (*Queue) Peek ¶
Peek returns a the first item in the queue by value without modifying the queue.
func (*Queue) Poll ¶
func (q *Queue) Poll(ctx context.Context, number int64, timeout time.Duration) ([]interface{}, error)
Poll retrieves items from the queue. If there are some items in the queue, Poll will return a number UP TO the number passed in as a parameter. If no items are in the queue, this method will pause until items are added to the queue or the provided timeout is reached. A non-positive timeout will block until items are added. If a timeout occurs, ErrTimeout is returned.
type RTMPVideoStream ¶
type Subscriber ¶
type Subscriber interface { Subscribe(ctx context.Context, gotData func(seqNo uint64, data []byte, eof bool)) error IsLive() bool Unsubscribe() error String() string }
Subscriber subscribes to a stream defined by strmID. It returns a reader that contains the stream. Example 1:
sub, metadata := ppspp.NewSubscriber("StrmID") stream := NewStream("StrmID", metadata) ctx, cancel := context.WithCancel(context.Background() err := sub.Subscribe(ctx, func(seqNo uint64, data []byte){ stream.WriteSeg(seqNo, data) }) time.Sleep(time.Second * 5) cancel()
Example 2:
sub.Unsubscribe() //This is the same with calling cancel()
type VideoFormat ¶
type VideoFormat uint32
func MakeVideoFormatType ¶
func MakeVideoFormatType(base uint32) (c VideoFormat)
type VideoManifest ¶
type VideoManifest interface { GetManifestID() string GetVideoFormat() VideoFormat String() string }
type VideoStream ¶
type VideoStream interface { AppData() AppData GetStreamID() string GetStreamFormat() VideoFormat String() string }