Documentation ¶
Index ¶
- Constants
- Variables
- func EstimateFPS(frameIntervals []time.Duration) float64
- func RunStandardStream(stream *Stream, sink StandardStreamSink, ch StreamSinkChan)
- type Camera
- func (c *Camera) Close(wg *sync.WaitGroup)
- func (c *Camera) ExtractHighRes(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
- func (c *Camera) GetStream(res defs.Resolution) *Stream
- func (c *Camera) HighResRecordingStreamName() string
- func (c *Camera) ID() int64
- func (c *Camera) LastPacketAt() time.Time
- func (c *Camera) LatestImage(contentType string) []byte
- func (c *Camera) LongLivedName() string
- func (c *Camera) LowResRecordingStreamName() string
- func (c *Camera) Name() string
- func (c *Camera) RecordingStreamName(resolution defs.Resolution) string
- func (c *Camera) Start() error
- type CameraModelOutputParameters
- type CameraModels
- type ExtractMethod
- type FullChannelPolicy
- type RingBufferListener
- type StandardStreamSink
- type Stream
- func (s *Stream) Close(wg *sync.WaitGroup)
- func (s *Stream) ConnectSink(name string, sink StreamSinkChan)
- func (s *Stream) ConnectSinkAndRun(name string, sink StandardStreamSink) error
- func (s *Stream) Info() *StreamInfo
- func (s *Stream) LastPacketReceivedAt() time.Time
- func (s *Stream) Listen(address string) error
- func (s *Stream) RecentFrameStats() StreamStats
- func (s *Stream) RecentFrameTimes() []float64
- func (s *Stream) RemoveSink(sink StreamSinkChan)
- type StreamInfo
- type StreamMsg
- type StreamMsgType
- type StreamSinkChan
- type StreamStats
- type VideoDecodeReader
- func (r *VideoDecodeReader) Close()
- func (r *VideoDecodeReader) GetLastImageIfDifferent(ifNotEqualTo int64) (*accel.YUVImage, int64, time.Time)
- func (r *VideoDecodeReader) LastImageCopy() (*accel.YUVImage, int64)
- func (r *VideoDecodeReader) LastPacketAt() time.Time
- func (r *VideoDecodeReader) OnConnect(stream *Stream) (StreamSinkChan, error)
- func (r *VideoDecodeReader) OnPacketRTP(packet *videox.VideoPacket)
- type VideoRecorder
- type VideoRingBuffer
- func (r *VideoRingBuffer) AddPacketListener(name string, c chan *videox.VideoPacket, policy FullChannelPolicy)
- func (r *VideoRingBuffer) Close()
- func (r *VideoRingBuffer) ExtractRawBuffer(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
- func (r *VideoRingBuffer) ExtractRawBufferNoLock(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
- func (r *VideoRingBuffer) FindLatestIDRPacketNoLock() int
- func (r *VideoRingBuffer) OnConnect(stream *Stream) (StreamSinkChan, error)
- func (r *VideoRingBuffer) OnPacketRTP(packet *videox.VideoPacket)
- func (r *VideoRingBuffer) RemovePacketListener(c chan *videox.VideoPacket)
Constants ¶
const StreamSinkChanDefaultBufferSize = 10
There isn't much rhyme or reason behind this number UPDATE: While running on RPi5, 2 seems to be too small. We frequently get blocking on various stream sinks. Specifically, I'm seeing blocking of up to 4ms on these sinks: 'LD Decode', 'HD Ring'. So I'm raising the size of this buffer from 2 to 4. Blocking when sending to these sinks is very bad, because if you do it enough, you end up losing incoming camera packets. 4 is not enough. Trying 10.
Variables ¶
var AllCameraModels []CameraModels
AllCameraModels is an array of all camera model names, excluding "Unknown"
Functions ¶
func EstimateFPS ¶
Given a set of consecutive frame intervals, estimate the average frames per second. The value is a float64 because cameras can be configured for less than 1 FPS. The numbers I've seen on Hikvision are 1/2, 1/4, 1/8, 1/16
func RunStandardStream ¶
func RunStandardStream(stream *Stream, sink StandardStreamSink, ch StreamSinkChan)
A generic message loop that should cater for most streams
Types ¶
type Camera ¶
type Camera struct { Log logs.Log Config configdb.Camera // Copy from the config database, from the moment when the camera was created. Can be out of date if camera config has been modified since. LowStream *Stream HighStream *Stream HighDumper *VideoRingBuffer LowDecoder *VideoDecodeReader LowDumper *VideoRingBuffer // contains filtered or unexported fields }
Camera represents a single physical camera, with two streams (high and low res)
func (*Camera) Close ¶
Close the camera. If wg is not nil, then you must use it to signal when all of your resources are closed.
func (*Camera) ExtractHighRes ¶
func (c *Camera) ExtractHighRes(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
Extract from <now - duration> until <now>. duration is a positive number.
func (*Camera) GetStream ¶
func (c *Camera) GetStream(res defs.Resolution) *Stream
Get either the high or low resolution stream
func (*Camera) HighResRecordingStreamName ¶
The name of the high res recording stream in the video archive
func (*Camera) LastPacketAt ¶
Return the time of the most recently received packet from the camera. But NOTE! We return the oldest such packet of the two streams. In other words, we return min(LowStream.LastPacketReceivedAt(), HighStream.LastPacketReceivedAt()). This is to work around an issue where I have a camera who's HD stream frequently fails to send any packets, but the LD stream works fine. I'm not sure if this is a bug in gortsplib, but this seems like a robust strategy to have anyway. This bug only manifests during startup. Once you start receiving packets, it generally keeps going forever.
func (*Camera) LatestImage ¶
func (*Camera) LongLivedName ¶
func (*Camera) LowResRecordingStreamName ¶
The name of the low res recording stream in the video archive
func (*Camera) RecordingStreamName ¶
func (c *Camera) RecordingStreamName(resolution defs.Resolution) string
The name of high/low res recording stream in the video archive
type CameraModelOutputParameters ¶
type CameraModelOutputParameters struct { LowResURL string HighResURL string PacketsAreAnnexBEncoded bool }
func GetCameraModelParameters ¶
func GetCameraModelParameters(model, baseURL, lowResSuffix, highResSuffix string) (*CameraModelOutputParameters, error)
Should switch to onvif!
type CameraModels ¶
type CameraModels string
const ( // SYNC-CAMERA-MODELS CameraModelUnknown CameraModels = "" CameraModelHikVision CameraModels = "HikVision" CameraModelJustTesting CameraModels = "JustTesting" )
func IdentifyCameraFromHTTP ¶
func IdentifyCameraFromHTTP(headers http.Header, body string) CameraModels
Attempt to identify the camera from the HTTP response it sends when asked for it's root page (eg http://192.168.10.5)
type ExtractMethod ¶
type ExtractMethod int
const ( ExtractMethodShallowClone ExtractMethod = iota // Make a shallow copy of the packets, leaving the camera's buffer intact ExtractMethodDeepClone // Make a deep copy of the packet contents, leaving the camera's buffer intact ExtractMethodDrain // Drain the buffer, leaving the camera's buffer empty )
A note on Shallow vs Deep clone: I think that initially when I was building this, I had forgotten that I was using a garbage collected language, and so I made the Clone extraction a deep clone, where I make a copy of the packet contents. Subsequently, I realized that this is a waste of effort, we should simply use shallow clones most of the time, and let the garbage collector handle the memory sweep. So if in doubt, just use a shallow clone. The reason I leave shallow and deep here explicitly, is for a future proof, in case we need to be stricter about our memory consumption, and take more careful accounting of memory use.
type FullChannelPolicy ¶
type FullChannelPolicy int
If we're sending to a channel, and it is full, what do we do?
const ( FullChannelPolicyDrop FullChannelPolicy = iota // If channel is full, drop packets FullChannelPolicyStall // If channel is full, write the packet anyway, knowing that we'll block )
type RingBufferListener ¶
type RingBufferListener struct { Name string Chan chan *videox.VideoPacket Policy FullChannelPolicy // contains filtered or unexported fields }
RingBufferListener is a listener that receives video packets from the ring buffer via a channel. The name is used only for logging and debugging. The thing that uniquely identifies the listener is the channel.
type StandardStreamSink ¶
type StandardStreamSink interface { // OnConnect is called by Stream.ConnectSinkAndRun(). // You must return a channel to which all stream messages will be sent. OnConnect(stream *Stream) (StreamSinkChan, error) OnPacketRTP(packet *videox.VideoPacket) // Called by RunStandardStream(), when it receives a StreamMsgTypePacket Close() // Called by RunStandardStream(), when it receives a StreamMsgTypeClose }
StandardStreamSink allows you to implement an interface for receiving stream packets, instead of writing a select loop.
type Stream ¶
type Stream struct { Log logs.Log Client *gortsplib.Client Ident string // Just for logs. Simply CameraName.StreamName. CameraName string // Just for logs StreamName string // The stream name, such as "low" and "high" Codec videox.Codec // contains filtered or unexported fields }
Stream is a bridge between the RTSP library (gortsplib) and one or more "sink" objects. The stream understands just enough about RTSP and video codecs to be able to receive information from gortsplib, transform them into our own internal data structures, and pass them onto the sinks. For each camera, we create one stream to handle the high res video, and another stream for the low res video.
func (*Stream) Close ¶
Close the stream. If wg is not nil, then you must call wg.Done() once all of your sinks have closed themselves.
func (*Stream) ConnectSink ¶
func (s *Stream) ConnectSink(name string, sink StreamSinkChan)
Connect a sink.
Every call to ConnectSink must be accompanied by a call to RemoveSink. The usual time to do this is when receiving StreamMsgTypeClose.
This function will panic if you attempt to add the same sink twice.
func (*Stream) ConnectSinkAndRun ¶
func (s *Stream) ConnectSinkAndRun(name string, sink StandardStreamSink) error
Connect a standard sink object and run it.
You don't need to call RemoveSink when using ConnectSinkAndRun. When RunStandardStream exits, it will call RemoveSink for you.
func (*Stream) Info ¶
func (s *Stream) Info() *StreamInfo
Return the stream info, or nil if we have not yet encountered the necessary NALUs
func (*Stream) LastPacketReceivedAt ¶
Return the wall time of the most recently received packet
func (*Stream) RecentFrameStats ¶
func (s *Stream) RecentFrameStats() StreamStats
Estimate the frame rate
func (*Stream) RecentFrameTimes ¶
type StreamInfo ¶
type StreamMsg ¶
type StreamMsg struct { Type StreamMsgType Stream *Stream Packet *videox.VideoPacket }
StreamMsg is sent on a channel from the stream to a sink
type StreamMsgType ¶
type StreamMsgType int
const ( StreamMsgTypePacket StreamMsgType = iota // New camera packet StreamMsgTypeClose // Close yourself. There will be no further packets. )
type StreamSinkChan ¶
type StreamSinkChan chan StreamMsg
A stream sink is fundamentally just a channel
type StreamStats ¶
type StreamStats struct { KeyframeInterval int `json:"keyframeInterval"` // number of frames between keyframes FPS float64 `json:"fps"` // frames per second FrameSize float64 `json:"frameSize"` // average frame size in bytes KeyframeSize float64 `json:"keyframeSize"` // average key-frame size in bytes InterframeSize float64 `json:"interframeSize"` // average non-key-frame size in bytes FrameIntervalAvg float64 `json:"frameIntervalAvg"` // Average seconds between frames FrameIntervalVar float64 `json:"frameIntervalVar"` // Variance of seconds between frames }
Average observed stats from a sample of recent frames
func (*StreamStats) FPSRounded ¶
func (s *StreamStats) FPSRounded() int
func (*StreamStats) KeyframeIntervalDuration ¶
func (s *StreamStats) KeyframeIntervalDuration() time.Duration
Time between keyframes
type VideoDecodeReader ¶
type VideoDecodeReader struct { Log logs.Log //TrackID int //Track *gortsplib.TrackH264 Decoder *videox.VideoDecoder // contains filtered or unexported fields }
VideoDecodeReader decodes the video stream and emits frames NOTE: Our lastImg is a copy of the most recent frame. This memcpy might be a substantial waste if you're decoding a high res stream, and only need access to the latest frame occasionally. Such a scenario might be better suited by a blocking call which waits for a new frame to be decoded, depending upon the acceptable latency.
func NewVideoDecodeReader ¶
func NewVideoDecodeReader() *VideoDecodeReader
func (*VideoDecodeReader) Close ¶
func (r *VideoDecodeReader) Close()
func (*VideoDecodeReader) GetLastImageIfDifferent ¶
func (r *VideoDecodeReader) GetLastImageIfDifferent(ifNotEqualTo int64) (*accel.YUVImage, int64, time.Time)
Return a copy of the latest image and its ID, if it's different to the given ID
func (*VideoDecodeReader) LastImageCopy ¶
func (r *VideoDecodeReader) LastImageCopy() (*accel.YUVImage, int64)
Return a copy of the most recently decoded frame (or nil, if there is none available yet), and the frame ID
func (*VideoDecodeReader) LastPacketAt ¶
func (r *VideoDecodeReader) LastPacketAt() time.Time
Return the time when the last packet was received
func (*VideoDecodeReader) OnConnect ¶
func (r *VideoDecodeReader) OnConnect(stream *Stream) (StreamSinkChan, error)
func (*VideoDecodeReader) OnPacketRTP ¶
func (r *VideoDecodeReader) OnPacketRTP(packet *videox.VideoPacket)
type VideoRecorder ¶
VideoRecorder writes incoming packets to our 'fsv' video archive We operate on top of VideoRingBuffer, so that we can start recording at any moment, and have some history of packets to write to the archive. For event-triggered recording modes, this is vital because you always want some history that preceded the moment of the event trigger. For continuous recording modes this is not important.
func StartVideoRecorder ¶
func StartVideoRecorder(ringBuffer *VideoRingBuffer, streamName string, archive *fsv.Archive, includeHistory time.Duration) *VideoRecorder
Create a new video recorder and start recording. This function is expected to return very quickly. Specifically, the code inside LiveCameras that starts/stops recorders holds a lock while it performs this operation, assuming that this function will return very quickly.
func (*VideoRecorder) Stop ¶
func (r *VideoRecorder) Stop()
Stop recording. Like StartVideoRecorder(), this function is expected to return immediately.
type VideoRingBuffer ¶
type VideoRingBuffer struct { Log logs.Log BufferLock sync.Mutex // Guards all access to Buffer Buffer ringbuffer.WeightedRingT[videox.VideoPacket] // contains filtered or unexported fields }
VideoRingBuffer stores incoming packets in a fixed-size ring buffer, so that we always have a bit of video history to use. This is specifically useful when recordings are triggered by events such as motion or object detection. In such a case, you always want some seconds of prior history, from the moments before the event was detected.
If you need to extract some history, and then continue receiving packets and guarantee that there is no gap in between those two, then you do this: 1. BufferLock.Lock() 2. ExtractRawBufferNoLock() 3. AddPacketListener() 4. BufferLock.Unlock()
The above sequence is what videoRecorder uses when it starts recording.
func NewVideoRingBuffer ¶
func NewVideoRingBuffer(maxRingBufferBytes int) *VideoRingBuffer
func (*VideoRingBuffer) AddPacketListener ¶
func (r *VideoRingBuffer) AddPacketListener(name string, c chan *videox.VideoPacket, policy FullChannelPolicy)
func (*VideoRingBuffer) Close ¶
func (r *VideoRingBuffer) Close()
func (*VideoRingBuffer) ExtractRawBuffer ¶
func (r *VideoRingBuffer) ExtractRawBuffer(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
Take BufferLock, then call ExtractRawBufferNoLock
func (*VideoRingBuffer) ExtractRawBufferNoLock ¶
func (r *VideoRingBuffer) ExtractRawBufferNoLock(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)
Extract from <video_end - duration> until <video_end>. video_end is the PTS of the most recently received packet. duration is a positive number. You must be holding BufferLock before calling this function.
func (*VideoRingBuffer) FindLatestIDRPacketNoLock ¶
func (r *VideoRingBuffer) FindLatestIDRPacketNoLock() int
Scan backwards in the ring buffer to find the most recent packet containing an IDR frame Assumes that you are holding BufferLock Returns the index in the buffer, or -1 if none found
func (*VideoRingBuffer) OnConnect ¶
func (r *VideoRingBuffer) OnConnect(stream *Stream) (StreamSinkChan, error)
func (*VideoRingBuffer) OnPacketRTP ¶
func (r *VideoRingBuffer) OnPacketRTP(packet *videox.VideoPacket)
func (*VideoRingBuffer) RemovePacketListener ¶
func (r *VideoRingBuffer) RemovePacketListener(c chan *videox.VideoPacket)