camera

package
v0.0.0-...-e05d22d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const StreamSinkChanDefaultBufferSize = 10

There isn't much rhyme or reason behind this number UPDATE: While running on RPi5, 2 seems to be too small. We frequently get blocking on various stream sinks. Specifically, I'm seeing blocking of up to 4ms on these sinks: 'LD Decode', 'HD Ring'. So I'm raising the size of this buffer from 2 to 4. Blocking when sending to these sinks is very bad, because if you do it enough, you end up losing incoming camera packets. 4 is not enough. Trying 10.

Variables

View Source
var AllCameraModels []CameraModels

AllCameraModels is an array of all camera model names, excluding "Unknown"

Functions

func EstimateFPS

func EstimateFPS(frameIntervals []time.Duration) float64

Given a set of consecutive frame intervals, estimate the average frames per second. The value is a float64 because cameras can be configured for less than 1 FPS. The numbers I've seen on Hikvision are 1/2, 1/4, 1/8, 1/16

func RunStandardStream

func RunStandardStream(stream *Stream, sink StandardStreamSink, ch StreamSinkChan)

A generic message loop that should cater for most streams

Types

type Camera

type Camera struct {
	Log        logs.Log
	Config     configdb.Camera // Copy from the config database, from the moment when the camera was created. Can be out of date if camera config has been modified since.
	LowStream  *Stream
	HighStream *Stream
	HighDumper *VideoRingBuffer
	LowDecoder *VideoDecodeReader
	LowDumper  *VideoRingBuffer
	// contains filtered or unexported fields
}

Camera represents a single physical camera, with two streams (high and low res)

func NewCamera

func NewCamera(log logs.Log, cfg configdb.Camera, ringBufferSizeBytes int) (*Camera, error)

func (*Camera) Close

func (c *Camera) Close(wg *sync.WaitGroup)

Close the camera. If wg is not nil, then you must use it to signal when all of your resources are closed.

func (*Camera) ExtractHighRes

func (c *Camera) ExtractHighRes(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)

Extract from <now - duration> until <now>. duration is a positive number.

func (*Camera) GetStream

func (c *Camera) GetStream(res defs.Resolution) *Stream

Get either the high or low resolution stream

func (*Camera) HighResRecordingStreamName

func (c *Camera) HighResRecordingStreamName() string

The name of the high res recording stream in the video archive

func (*Camera) ID

func (c *Camera) ID() int64

func (*Camera) LastPacketAt

func (c *Camera) LastPacketAt() time.Time

Return the time of the most recently received packet from the camera. But NOTE! We return the oldest such packet of the two streams. In other words, we return min(LowStream.LastPacketReceivedAt(), HighStream.LastPacketReceivedAt()). This is to work around an issue where I have a camera who's HD stream frequently fails to send any packets, but the LD stream works fine. I'm not sure if this is a bug in gortsplib, but this seems like a robust strategy to have anyway. This bug only manifests during startup. Once you start receiving packets, it generally keeps going forever.

func (*Camera) LatestImage

func (c *Camera) LatestImage(contentType string) []byte

func (*Camera) LongLivedName

func (c *Camera) LongLivedName() string

func (*Camera) LowResRecordingStreamName

func (c *Camera) LowResRecordingStreamName() string

The name of the low res recording stream in the video archive

func (*Camera) Name

func (c *Camera) Name() string

func (*Camera) RecordingStreamName

func (c *Camera) RecordingStreamName(resolution defs.Resolution) string

The name of high/low res recording stream in the video archive

func (*Camera) Start

func (c *Camera) Start() error

type CameraModelOutputParameters

type CameraModelOutputParameters struct {
	LowResURL               string
	HighResURL              string
	PacketsAreAnnexBEncoded bool
}

func GetCameraModelParameters

func GetCameraModelParameters(model, baseURL, lowResSuffix, highResSuffix string) (*CameraModelOutputParameters, error)

Should switch to onvif!

type CameraModels

type CameraModels string
const (
	// SYNC-CAMERA-MODELS
	CameraModelUnknown     CameraModels = ""
	CameraModelHikVision   CameraModels = "HikVision"
	CameraModelJustTesting CameraModels = "JustTesting"
)

func IdentifyCameraFromHTTP

func IdentifyCameraFromHTTP(headers http.Header, body string) CameraModels

Attempt to identify the camera from the HTTP response it sends when asked for it's root page (eg http://192.168.10.5)

type ExtractMethod

type ExtractMethod int
const (
	ExtractMethodShallowClone ExtractMethod = iota // Make a shallow copy of the packets, leaving the camera's buffer intact
	ExtractMethodDeepClone                         // Make a deep copy of the packet contents, leaving the camera's buffer intact
	ExtractMethodDrain                             // Drain the buffer, leaving the camera's buffer empty
)

A note on Shallow vs Deep clone: I think that initially when I was building this, I had forgotten that I was using a garbage collected language, and so I made the Clone extraction a deep clone, where I make a copy of the packet contents. Subsequently, I realized that this is a waste of effort, we should simply use shallow clones most of the time, and let the garbage collector handle the memory sweep. So if in doubt, just use a shallow clone. The reason I leave shallow and deep here explicitly, is for a future proof, in case we need to be stricter about our memory consumption, and take more careful accounting of memory use.

type FullChannelPolicy

type FullChannelPolicy int

If we're sending to a channel, and it is full, what do we do?

const (
	FullChannelPolicyDrop  FullChannelPolicy = iota // If channel is full, drop packets
	FullChannelPolicyStall                          // If channel is full, write the packet anyway, knowing that we'll block
)

type RingBufferListener

type RingBufferListener struct {
	Name   string
	Chan   chan *videox.VideoPacket
	Policy FullChannelPolicy
	// contains filtered or unexported fields
}

RingBufferListener is a listener that receives video packets from the ring buffer via a channel. The name is used only for logging and debugging. The thing that uniquely identifies the listener is the channel.

type StandardStreamSink

type StandardStreamSink interface {
	// OnConnect is called by Stream.ConnectSinkAndRun().
	// You must return a channel to which all stream messages will be sent.
	OnConnect(stream *Stream) (StreamSinkChan, error)
	OnPacketRTP(packet *videox.VideoPacket) // Called by RunStandardStream(), when it receives a StreamMsgTypePacket
	Close()                                 // Called by RunStandardStream(), when it receives a StreamMsgTypeClose
}

StandardStreamSink allows you to implement an interface for receiving stream packets, instead of writing a select loop.

type Stream

type Stream struct {
	Log        logs.Log
	Client     *gortsplib.Client
	Ident      string // Just for logs. Simply CameraName.StreamName.
	CameraName string // Just for logs
	StreamName string // The stream name, such as "low" and "high"
	Codec      videox.Codec
	// contains filtered or unexported fields
}

Stream is a bridge between the RTSP library (gortsplib) and one or more "sink" objects. The stream understands just enough about RTSP and video codecs to be able to receive information from gortsplib, transform them into our own internal data structures, and pass them onto the sinks. For each camera, we create one stream to handle the high res video, and another stream for the low res video.

func NewStream

func NewStream(logger logs.Log, cameraName, streamName string, cameraSendsAnnexBEncoded bool) *Stream

func (*Stream) Close

func (s *Stream) Close(wg *sync.WaitGroup)

Close the stream. If wg is not nil, then you must call wg.Done() once all of your sinks have closed themselves.

func (*Stream) ConnectSink

func (s *Stream) ConnectSink(name string, sink StreamSinkChan)

Connect a sink.

Every call to ConnectSink must be accompanied by a call to RemoveSink. The usual time to do this is when receiving StreamMsgTypeClose.

This function will panic if you attempt to add the same sink twice.

func (*Stream) ConnectSinkAndRun

func (s *Stream) ConnectSinkAndRun(name string, sink StandardStreamSink) error

Connect a standard sink object and run it.

You don't need to call RemoveSink when using ConnectSinkAndRun. When RunStandardStream exits, it will call RemoveSink for you.

func (*Stream) Info

func (s *Stream) Info() *StreamInfo

Return the stream info, or nil if we have not yet encountered the necessary NALUs

func (*Stream) LastPacketReceivedAt

func (s *Stream) LastPacketReceivedAt() time.Time

Return the wall time of the most recently received packet

func (*Stream) Listen

func (s *Stream) Listen(address string) error

func (*Stream) RecentFrameStats

func (s *Stream) RecentFrameStats() StreamStats

Estimate the frame rate

func (*Stream) RecentFrameTimes

func (s *Stream) RecentFrameTimes() []float64

func (*Stream) RemoveSink

func (s *Stream) RemoveSink(sink StreamSinkChan)

Remove a sink

type StreamInfo

type StreamInfo struct {
	Width  int
	Height int
}

type StreamMsg

type StreamMsg struct {
	Type   StreamMsgType
	Stream *Stream
	Packet *videox.VideoPacket
}

StreamMsg is sent on a channel from the stream to a sink

type StreamMsgType

type StreamMsgType int
const (
	StreamMsgTypePacket StreamMsgType = iota // New camera packet
	StreamMsgTypeClose                       // Close yourself. There will be no further packets.
)

type StreamSinkChan

type StreamSinkChan chan StreamMsg

A stream sink is fundamentally just a channel

type StreamStats

type StreamStats struct {
	KeyframeInterval int     `json:"keyframeInterval"` // number of frames between keyframes
	FPS              float64 `json:"fps"`              // frames per second
	FrameSize        float64 `json:"frameSize"`        // average frame size in bytes
	KeyframeSize     float64 `json:"keyframeSize"`     // average key-frame size in bytes
	InterframeSize   float64 `json:"interframeSize"`   // average non-key-frame size in bytes
	FrameIntervalAvg float64 `json:"frameIntervalAvg"` // Average seconds between frames
	FrameIntervalVar float64 `json:"frameIntervalVar"` // Variance of seconds between frames
}

Average observed stats from a sample of recent frames

func (*StreamStats) FPSRounded

func (s *StreamStats) FPSRounded() int

func (*StreamStats) KeyframeIntervalDuration

func (s *StreamStats) KeyframeIntervalDuration() time.Duration

Time between keyframes

type VideoDecodeReader

type VideoDecodeReader struct {
	Log logs.Log
	//TrackID int
	//Track   *gortsplib.TrackH264
	Decoder *videox.VideoDecoder
	// contains filtered or unexported fields
}

VideoDecodeReader decodes the video stream and emits frames NOTE: Our lastImg is a copy of the most recent frame. This memcpy might be a substantial waste if you're decoding a high res stream, and only need access to the latest frame occasionally. Such a scenario might be better suited by a blocking call which waits for a new frame to be decoded, depending upon the acceptable latency.

func NewVideoDecodeReader

func NewVideoDecodeReader() *VideoDecodeReader

func (*VideoDecodeReader) Close

func (r *VideoDecodeReader) Close()

func (*VideoDecodeReader) GetLastImageIfDifferent

func (r *VideoDecodeReader) GetLastImageIfDifferent(ifNotEqualTo int64) (*accel.YUVImage, int64, time.Time)

Return a copy of the latest image and its ID, if it's different to the given ID

func (*VideoDecodeReader) LastImageCopy

func (r *VideoDecodeReader) LastImageCopy() (*accel.YUVImage, int64)

Return a copy of the most recently decoded frame (or nil, if there is none available yet), and the frame ID

func (*VideoDecodeReader) LastPacketAt

func (r *VideoDecodeReader) LastPacketAt() time.Time

Return the time when the last packet was received

func (*VideoDecodeReader) OnConnect

func (r *VideoDecodeReader) OnConnect(stream *Stream) (StreamSinkChan, error)

func (*VideoDecodeReader) OnPacketRTP

func (r *VideoDecodeReader) OnPacketRTP(packet *videox.VideoPacket)

type VideoRecorder

type VideoRecorder struct {
	Log logs.Log
	// contains filtered or unexported fields
}

VideoRecorder writes incoming packets to our 'fsv' video archive We operate on top of VideoRingBuffer, so that we can start recording at any moment, and have some history of packets to write to the archive. For event-triggered recording modes, this is vital because you always want some history that preceded the moment of the event trigger. For continuous recording modes this is not important.

func StartVideoRecorder

func StartVideoRecorder(ringBuffer *VideoRingBuffer, streamName string, archive *fsv.Archive, includeHistory time.Duration) *VideoRecorder

Create a new video recorder and start recording. This function is expected to return very quickly. Specifically, the code inside LiveCameras that starts/stops recorders holds a lock while it performs this operation, assuming that this function will return very quickly.

func (*VideoRecorder) Stop

func (r *VideoRecorder) Stop()

Stop recording. Like StartVideoRecorder(), this function is expected to return immediately.

type VideoRingBuffer

type VideoRingBuffer struct {
	Log logs.Log

	BufferLock sync.Mutex // Guards all access to Buffer
	Buffer     ringbuffer.WeightedRingT[videox.VideoPacket]
	// contains filtered or unexported fields
}

VideoRingBuffer stores incoming packets in a fixed-size ring buffer, so that we always have a bit of video history to use. This is specifically useful when recordings are triggered by events such as motion or object detection. In such a case, you always want some seconds of prior history, from the moments before the event was detected.

If you need to extract some history, and then continue receiving packets and guarantee that there is no gap in between those two, then you do this: 1. BufferLock.Lock() 2. ExtractRawBufferNoLock() 3. AddPacketListener() 4. BufferLock.Unlock()

The above sequence is what videoRecorder uses when it starts recording.

func NewVideoRingBuffer

func NewVideoRingBuffer(maxRingBufferBytes int) *VideoRingBuffer

func (*VideoRingBuffer) AddPacketListener

func (r *VideoRingBuffer) AddPacketListener(name string, c chan *videox.VideoPacket, policy FullChannelPolicy)

func (*VideoRingBuffer) Close

func (r *VideoRingBuffer) Close()

func (*VideoRingBuffer) ExtractRawBuffer

func (r *VideoRingBuffer) ExtractRawBuffer(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)

Take BufferLock, then call ExtractRawBufferNoLock

func (*VideoRingBuffer) ExtractRawBufferNoLock

func (r *VideoRingBuffer) ExtractRawBufferNoLock(method ExtractMethod, duration time.Duration) (*videox.PacketBuffer, error)

Extract from <video_end - duration> until <video_end>. video_end is the PTS of the most recently received packet. duration is a positive number. You must be holding BufferLock before calling this function.

func (*VideoRingBuffer) FindLatestIDRPacketNoLock

func (r *VideoRingBuffer) FindLatestIDRPacketNoLock() int

Scan backwards in the ring buffer to find the most recent packet containing an IDR frame Assumes that you are holding BufferLock Returns the index in the buffer, or -1 if none found

func (*VideoRingBuffer) OnConnect

func (r *VideoRingBuffer) OnConnect(stream *Stream) (StreamSinkChan, error)

func (*VideoRingBuffer) OnPacketRTP

func (r *VideoRingBuffer) OnPacketRTP(packet *videox.VideoPacket)

func (*VideoRingBuffer) RemovePacketListener

func (r *VideoRingBuffer) RemovePacketListener(c chan *videox.VideoPacket)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL