fsv

package
v0.0.0-...-e05d22d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: MIT Imports: 16 Imported by: 0

README

Filesystem Video (FSV)

FSV is a formalization of a directory/file hierarchy, with associated naming convention, for storing video files. We also manage the chunking of long recording streams into smaller files, and also the cleaning up of old files so that we don't exceed the storage budget.

Archive size and index/file overhead

Should we keep an in-memory index of all files in the archive?

The pros:

  1. We can maintain an ordered list, so finding all files that span a given time window is a trivial binary search.
  2. No need to ask the OS every time.
  3. (Ancillary benefit) because we don't have to ask the OS to list files every time we do a read, we can keep the naming convention extremely simple - i.e. one directory per stream.

The cons:

  1. We have to maintain an index in memory, which uses memory.
  2. If we ever wanted to implement independent reader and writer processes, then this would complicate that.

How much memory? Firstly, our data structure would need to store the filename (eg "1708584695_video.rf1i"), the start time and duration. Let's say 40 bytes per file. Let's imagine a 7 day archive, with one file for every 1000 seconds. 7 * 24 * 3600 = 604800 seconds. 604800 / 1000 = 604 files. 604 * 40 = 24160 bytes per stream. This is small enough that the pros seems to outweight the cons.

Update -- because of our rf1 multi-file architecture, we also need to store the list of tracks inside this index. So the data structure is more like 80 bytes per file. Even if we double the 24kb per stream, that's still nothing.

Write Buffer

I'm experimenting with adding a per-stream write buffer into the archive. What got me to try this is that I'm experiencing extremely slow write times on an NTFS USB spinning disc drive, mounted to a WSL VM. I've tried enabling write caching in Windows for this disk, but it doesn't make a difference. It seems to be limited to about 2 MB/s. By caching writes, we can bunch up the writes to each stream. Let's see if this makes a difference.

Yes, it does seem to make a difference, at least on my problematic USB drive.

Write Buffer v2

During testing of a cheap 128GB USB stick (FAT32), plugged into a Raspberry Pi 5, I saw a lot of dropped video packets. This is probably attributable to either the NN processing thread, or the slow disk writes to the FSV archive, and I suspect it is the latter. It seems irresponsible NOT to allow asynchronous writes of the video feeds, so I'm going to do that. If this mode is enabled, then writes always occur from a background thread. OK - I ended up making buffered writes always operate asynchronously.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTrackNotFound = errors.New("Track not found")

Functions

func DoTimeRangesOverlap

func DoTimeRangesOverlap(start1, end1, start2, end2 time.Time) bool

func VideoFileHasVideoTrack

func VideoFileHasVideoTrack(vf VideoFile, trackName string, width, height int) bool

Returns true if the file has a video track with the given name, width and height

func VideoFileMaxTrackEndTime

func VideoFileMaxTrackEndTime(vf VideoFile) time.Time

Find the time of the last packet in the video file, from any track

Types

type Archive

type Archive struct {
	// contains filtered or unexported fields
}

Archive is a collection of zero or more video streams, rooted at the same base directory. Every sub-directory from the base holds the videos of one stream. The stream name is the directory name. Archive is not safe for use from multiple threads.

func Open

func Open(logger logs.Log, baseDir string, formats []VideoFormat, initSettings StaticSettings, settings DynamicSettings) (*Archive, error)

Open a directory of video files for reading and/or writing. The directory baseDir must exist, but it may be empty. When creating new streams, formats[0] is used, so the ordering of formats is important.

func (*Archive) AutoStatsToLog

func (a *Archive) AutoStatsToLog()

func (*Archive) Close

func (a *Archive) Close()

Close the archive. It is important to close the archive, because doing so will finalize the writing of any archive files. For example, rf1 files are oversized initially to avoid fragmentation, and closing them will shrink them back down to their final size, and update their headers with appropriate information. However, the archive is designed to withstand a hard reset, and to be able to recover as much data as possible in that event. It's just not the most efficient thing to do.

func (*Archive) GetDynamicSettings

func (a *Archive) GetDynamicSettings() DynamicSettings

func (*Archive) ListStreams

func (a *Archive) ListStreams() []*StreamInfo

Get a list of all streams in the archive, and some metadata about each stream.

func (*Archive) MaxVideoFileDuration

func (a *Archive) MaxVideoFileDuration() time.Duration

func (*Archive) Read

func (a *Archive) Read(streamName string, trackNames []string, startTime, endTime time.Time, flags ReadFlags) (map[string]*TrackReadResult, error)

Read packets from the archive. The map that is returned contains the tracks that were requested. If no packets are found, we return an empty map and a nil error.

func (*Archive) SetDynamicSettings

func (a *Archive) SetDynamicSettings(settings DynamicSettings)

func (*Archive) StreamInfo

func (a *Archive) StreamInfo(streamName string) *StreamInfo

Returns metadata about the stream, or nil if the stream is not found

func (*Archive) StreamSizes

func (a *Archive) StreamSizes() map[string]int64

Return the size of each stream

func (*Archive) TotalSize

func (a *Archive) TotalSize() int64

Return the total size of all files in the archive

func (*Archive) Write

func (a *Archive) Write(streamName string, payload map[string]TrackPayload) error

Write a payload to the archive. payload keys are track names. The payload must always include the exact same set of tracks, even if some of them have no new content to write. We use the set of tracks and their properties (eg width, height) to figure out when we need to close a file and open a new one. For example, if the user decides to enable HD recording, then the track composition would change. Such a change requires a new video file.

type DynamicSettings

type DynamicSettings struct {
	MaxArchiveSize int64 // Maximum size of all files in the archive. We will eat into old files when we need to recycle space. Zero = no limit.
}

Dynamic Settings (can be changed while running). These are settings that a user is likely to change while the system is running, so we make it possible to do so.

func DefaultDynamicSettings

func DefaultDynamicSettings() DynamicSettings

type ErrCodecSwitch

type ErrCodecSwitch struct {
	FromCodec string
	ToCodec   string
}

func (ErrCodecSwitch) Error

func (e ErrCodecSwitch) Error() string

type NALU

type NALU struct {
	PTS     time.Time
	Flags   NALUFlags
	Payload []byte
	Length  int32 // Length is only valid if Payload is nil
}

Network Abstraction Layer Unit (NALU)

func (*NALU) IsKeyFrame

func (n *NALU) IsKeyFrame() bool

type NALUFlags

type NALUFlags uint32

NALU flags

const (
	NALUFlagKeyFrame      NALUFlags = 1 // Key frame
	NALUFlagEssentialMeta NALUFlags = 2 // Essential metadata, required to initialize the decoder (eg SPS/PPS NALUs in h264/h265)
	NALUFlagAnnexB        NALUFlags = 4 // Packet has Annex-B "emulation prevention bytes" and start codes
)

We have 12 bits for flags, so maximum flag value is 1 << 11 = 2048

type ReadFlags

type ReadFlags int

Flags for reading packets

const (
	// If the requested time interval does not start on a keyframe,
	// then seek back to find the first keyframe before the requested start time.
	ReadFlagSeekBackToKeyFrame ReadFlags = 1 << iota
	// Do not read packet data. Only read packet headers.
	ReadFlagHeadersOnly
)

type StaticSettings

type StaticSettings struct {
	MaxBytesPerRead int           // Maximum number of bytes that we will return from a single Read()
	SweepInterval   time.Duration // How often we check if we need to recycle space
	// Write buffer settings
	MaxWriteBufferSize            int           // Maximum amount of memory per stream in our write buffer before we flush
	MaxWriteBufferDiscardMultiple int           // MaxWriteBufferSize * MaxWriteBufferDiscardMultiple is max buffer memory before we discard incoming writes
	MaxWriteBufferTime            time.Duration // Maximum amount of time that we'll buffer data in memory before writing it to disk

}

Static Settings (cannot be changed while archive is open). These settings cannot be changed while the archive is being used. If you want to change these, you must close and re-open the archive.

func DefaultStaticSettings

func DefaultStaticSettings() StaticSettings

func (*StaticSettings) MaxWriteBufferDiscardLimit

func (s *StaticSettings) MaxWriteBufferDiscardLimit() int

type StreamInfo

type StreamInfo struct {
	Name      string
	StartTime time.Time
	EndTime   time.Time
}

Information about a stream

type Track

type Track struct {
	Name      string
	StartTime time.Time
	Duration  time.Duration
	Codec     string
	Width     int // Only applicable to video tracks
	Height    int // Only applicable to video tracks
}

Metadata about a track

type TrackPayload

type TrackPayload struct {
	TrackType   rf1.TrackType
	Codec       string // For audio/video tracks
	VideoWidth  int    // For video tracks
	VideoHeight int    // For video tracks
	NALUs       []NALU
}

Track payload used when writing packets. Every write call has enough information to create a new track, if necessary. This allows us to have a stateless Write API.

func MakeVideoPayload

func MakeVideoPayload(codec string, width, height int, nalus []NALU) TrackPayload

func (*TrackPayload) EqualStructure

func (t *TrackPayload) EqualStructure(b *TrackPayload) bool

Returns true if all parameters except the payload is identical (eg same codec,width,height,etc)

type TrackReadResult

type TrackReadResult struct {
	Codec string
	NALS  []NALU
}

type VideoFile

type VideoFile interface {
	Close() error

	// ListTracks returns a map of track names to track metadata
	ListTracks() map[string]Track

	// Return true if the video file can still grow larger to accept the given packets.
	// Note that even if your video file is capable of storing terabytes of data in a single file,
	// you should arbitrarily cap the size at something smaller, because the sweeper deletes whole
	// video files. If your entire video history is stored in one or two files, then the rolling
	// recorder would need to delete massive chunks of history whenever it needs more space.
	HasCapacity(trackName string, nNALU int, maxPTS time.Time, combinedPayloadBytes int) bool

	// Create a new video track in the file.
	// You must do this before writing packets to the track.
	CreateVideoTrack(trackName string, timeBase time.Time, codec string, width, height int) error

	Write(trackName string, packets []NALU) error
	Read(trackName string, startTime, endTime time.Time, flags ReadFlags) ([]NALU, error)

	// Total size of the video file(s)
	Size() (int64, error)
}

VideoFile is the analog of VideoFormat, but this is an embodied handle that can be read from and written to

type VideoFileRF1

type VideoFileRF1 struct {
	File *rf1.File
}

func (*VideoFileRF1) Close

func (v *VideoFileRF1) Close() error

func (*VideoFileRF1) CreateVideoTrack

func (v *VideoFileRF1) CreateVideoTrack(trackName string, timeBase time.Time, codec string, width, height int) error

func (*VideoFileRF1) HasCapacity

func (v *VideoFileRF1) HasCapacity(trackName string, nNALU int, maxPTS time.Time, combinedPayloadBytes int) bool

func (*VideoFileRF1) ListTracks

func (v *VideoFileRF1) ListTracks() map[string]Track

func (*VideoFileRF1) Read

func (v *VideoFileRF1) Read(trackName string, startTime, endTime time.Time, flags ReadFlags) ([]NALU, error)

func (*VideoFileRF1) Size

func (v *VideoFileRF1) Size() (int64, error)

func (*VideoFileRF1) Write

func (v *VideoFileRF1) Write(trackName string, packets []NALU) error

type VideoFormat

type VideoFormat interface {
	IsVideoFile(filename string) bool
	Open(filename string) (VideoFile, error)
	Create(filename string) (VideoFile, error)
	Delete(filename string, tracks []string) error
}

A video file type must support the VideoFormat interface in order to be used by fsv.

type VideoFormatRF1

type VideoFormatRF1 struct {
}

func (*VideoFormatRF1) Create

func (f *VideoFormatRF1) Create(filename string) (VideoFile, error)

func (*VideoFormatRF1) Delete

func (f *VideoFormatRF1) Delete(filename string, tracks []string) error

func (*VideoFormatRF1) IsVideoFile

func (f *VideoFormatRF1) IsVideoFile(filename string) bool

func (*VideoFormatRF1) Open

func (f *VideoFormatRF1) Open(filename string) (VideoFile, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL