utils

package
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2024 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetricsNameUtilCacheCurrentCount = "utils_segment_cache_currently_cached"
	MetricsNameUtilCacheSegmentLen   = "utils_segment_cache_io_bytes_total"
	MetricsNameUtilCacheIOLatency    = "utils_segment_cache_io_latency_sec_total"
	MetricsNameUtilCacheIOCount      = "utils_segment_cache_io_ops_total"

	MetricsNameUtilFetcherSegmentLen  = "utils_segment_fetcher_read_bytes_total"
	MetricsNameUtilFetcherReadLatency = "utils_segment_fetcher_read_latency_sec_total"
	MetricsNameUtilFetcherIOCount     = "utils_segment_fetcher_read_ops_total"

	MetricsNameVODSegmentMgmtSegmentLen = "vod_segment_mgmt_read_bytes_total"
	MetricsNameVODSegmentMgmtIOCount    = "vod_segment_mgmt_read_ops_total"

	MetricsNameVODPlaylistBuiltCount = "vod_playlist_builder_playlist_gen_total"

	MetricsNameTrackerMonitorPlaylistReadCount = "tracker_monitor_read_playlists_total"
	MetricsNameTrackerMonitorNewSegmentCount   = "tracker_monitor_new_segment_total"

	MetricsNameTrackerMonitorSegmentReadLen   = "tracker_monitor_read_segment_bytes_total"
	MetricsNameTrackerMonitorSegmentReadCount = "tracker_monitor_read_segment_total"

	MetricsNameTrackerMonitorSegmentForwardLen   = "tracker_monitor_forward_segment_bytes_total"
	MetricsNameTrackerMonitorSegmentForwardCount = "tracker_monitor_forward_segment_total"

	MetricsNameForwarderSenderSegmentForwardLen     = "forwarder_sender_segment_bytes_total"
	MetricsNameForwarderSenderSegmentForwardLatency = "forwarder_sender_segment_send_latency_sec_total"
	MetricsNameForwarderSenderSegmentForwardCount   = "forwarder_sender_segment_total"

	MetricsNameEdgeManagerSegmentReadLen       = "edge_manager_read_segment_bytes_total"
	MetricsNameEdgeManagerSegmentReadCount     = "edge_manager_read_segment_total"
	MetricsNameEdgeManagerActiveRecordingCount = "edge_manager_currently_active_recordings"

	MetricsNameControlCentralSegmentMgmtSegmentReadLen   = "control_segment_mgmt_read_bytes_total"
	MetricsNameControlCentralSegmentMgmtSegmentReadCount = "control_segment_mgmt_read_total"

	MetricsNameControlManagerRegisteredSourceCount    = "control_manager_registered_sources"
	MetricsNameControlManagerConnectedSourceCount     = "control_manager_connected_sources"
	MetricsNameControlManagerRegisteredRecordingCount = "control_manager_registered_recordings"
	MetricsNameControlManagerActiveRecordingCount     = "control_manager_active_recordings"
)

Metrics generated by the system

Variables

This section is empty.

Functions

func CleanupObjectKey

func CleanupObjectKey(orig string) string

CleanupObjectKey cleanup object key string

Remove any leading `/` from object key

func ReadPlaylistFile

func ReadPlaylistFile(playlistFile string) (uri string, content []string, err error)

ReadPlaylistFile parse a HLS playlist file

@param playlistFile string - playlist file name
@returns playlist file URI
@returns content of the file

Types

type Broadcaster

type Broadcaster interface {
	/*
		Broadcast broadcast a message

			@param ctxt context.Context - execution context
			@param message interface{} - message to broadcast
	*/
	Broadcast(ctxt context.Context, message interface{}) error
}

Broadcaster message broadcasting client

func NewPubSubBroadcaster

func NewPubSubBroadcaster(
	psClient goutils.PubSubClient, broadcastTopic string,
) (Broadcaster, error)

NewPubSubBroadcaster define new PubSub message broadcast client

@param psClient goutils.PubSubClient - PubSub client
@param broadcastTopic string - message broadcast PubSub topic
@returns new client

type FSEvent

type FSEvent struct {
	// The original event
	fsnotify.Event
	// Meta file metadata
	Meta fs.FileInfo
}

FSEvent wrapper around `fsnotify.Event` with metadata

type FileSystemWatcher

type FileSystemWatcher interface {
	/*
		Start begin the file system watch loop

			@param ctxt context.Context - execution context
			@param runtimeCtxt context.Context - runtime context for any background tasks
	*/
	Start(ctxt, runtimeCtxt context.Context) error

	/*
		Stop end the file system watch loop

			@param ctxt context.Context - execution context
	*/
	Stop(ctxt context.Context) error

	/*
		AddPath add path to list of path to watch

			@param ctxt context.Context - execution context
			@param newPath string - new path to watch
	*/
	AddPath(ctxt context.Context, newPath string) error

	/*
		RemovePath remove path from list of watched path

			@param ctxt context.Context - execution context
			@param path string - new path to watch
	*/
	RemovePath(ctxt context.Context, path string) error
}

FileSystemWatcher monitor a DIR for file system changes

func NewFileSystemWatcher

func NewFileSystemWatcher(eventChan chan FSEvent) (FileSystemWatcher, error)

NewFileSystemWatcher define new FileSystemWatcher

@param eventChan chan FSEvent - the channel to return file system events
@returns watcher

type S3BulkDeleteError

type S3BulkDeleteError struct {
	Err    error
	Object string
}

S3BulkDeleteError a wrapper object returned if bulk object delete failed for any particular objects

func (S3BulkDeleteError) Error

func (e S3BulkDeleteError) Error() string

Error implement the error interface

type S3Client

type S3Client interface {
	/*
		ListBuckets get a list of available buckets at the server

			@param ctxt context.Context - execution context
			@returns list of bucket names
	*/
	ListBuckets(ctxt context.Context) ([]string, error)

	/*
		ListObjects get a list of objects in a bucket

			@param ctxt context.Context - execution context
			@param bucket string - the bucket name
			@param prefix *string - optionally, specify the object prefix to filter on
			@return list of bucket objects
	*/
	ListObjects(ctxt context.Context, bucket string, prefix *string) ([]string, error)

	/*
		CreateBucket create a bucket

			@param ctxt context.Context - execution context
			@param bucketName string - new bucket name
	*/
	CreateBucket(ctxt context.Context, bucketName string) error

	/*
		DeleteBucket delete a bucket

			@param ctxt context.Context - execution context
			@param bucketName string - new bucket name
	*/
	DeleteBucket(ctxt context.Context, bucketName string) error

	/*
		PutObject put a new object into a bucket

			@param ctxt context.Context - execution context
			@param bucketName string - target bucket name
			@param objectKey string - target object name within the bucket
			@param content []byte - object
	*/
	PutObject(ctxt context.Context, bucketName, objectKey string, content []byte) error

	/*
		GetObject get an object from a bucket

			@param ctxt context.Context - execution context
			@param bucketName string - target bucket name
			@param objectKey string - target object name within the bucket
			@returns object content
	*/
	GetObject(ctxt context.Context, bucketName, objectKey string) ([]byte, error)

	/*
		DeleteObject delete an object from a bucket

			@param ctxt context.Context - execution context
			@param bucketName string - target bucket name
			@param objectKey string - target object name within the bucket
	*/
	DeleteObject(ctxt context.Context, bucketName, objectKey string) error

	/*
		DeleteObjects delete a group of objects from a bucket

			@param ctxt context.Context - execution context
			@param bucketName string - target bucket name
			@param objectKeys []string - target object names within the bucket
	*/
	DeleteObjects(
		ctxt context.Context, bucketName string, objectKeys []string,
	) []S3BulkDeleteError
}

S3Client client for interacting with S3

func NewS3Client

func NewS3Client(config common.S3Config) (S3Client, error)

NewS3Client define new S3 operation client

@param config common.S3Config - S3 client config
@returns new client

type SegmentMetricsAgent

type SegmentMetricsAgent interface {
	/*
		RecordSegment update metrics for new segment

			@param segmentSize int - segment size
			@param labels map[string]string - label to attach to the metrics
	*/
	RecordSegment(segmentSize int, labels map[string]string)
}

SegmentMetricsAgent helper agent for writing segment related metrics

func NewSegmentMetricsAgent

func NewSegmentMetricsAgent(
	ctxt context.Context,
	collector goutils.MetricsCollector,
	lengthMetricsName, lengthMetricsHelpMsg, countMetricsName, countMetricsHelpMsg string,
	expectedLabels []string,
) (SegmentMetricsAgent, error)

NewSegmentMetricsAgent define a new SegmentMetricsAgent

@param ctxt context.Context - execution context
@param lengthMetricsName string - segment length metrics name
@param lengthMetricsHelpMsg string - segment length metrics helper message
@param countMetricsName string - segment count metrics name
@param countMetricsHelpMsg string - segment count metrics helper message
@param expectedLabels []string - set of labels to attach to the metrics
@returns new metrics helper agent

type SegmentReader

type SegmentReader interface {
	/*
		ReadSegment read one segment from specified location

			@param ctxt context.Context - execution context
			@param segment common.VideoSegment - video segment
			@param returnCB SegmentReturnCallback - callback used to return the read segment back
	*/
	ReadSegment(
		ctxt context.Context, segment common.VideoSegment, returnCB SegmentReturnCallback,
	) error

	/*
		Stop stops the daemon process

			@param ctxt context.Context - execution context
	*/
	Stop(ctxt context.Context) error
}

SegmentReader support daemon process which reads HLS MPEG-TS segment asynchronously

func NewSegmentReader

func NewSegmentReader(
	parentContext context.Context, workerCount int, s3 S3Client, metrics goutils.MetricsCollector,
) (SegmentReader, error)

NewSegmentReader define new SegmentReader

@param parentContext context.Context - context from which to define the worker context
@param workerCount int - number of parallel read worker to define
@param s3 S3Client - S3 client for operating against the S3 server
@param metrics goutils.MetricsCollector - metrics framework client
@return new SegmentReader

type SegmentReturnCallback

type SegmentReturnCallback func(ctxt context.Context, segmentID string, content []byte) error

SegmentReturnCallback function signature of callback to receive a read segment

type VideoSegmentCache

type VideoSegmentCache interface {
	/*
		CacheSegment add video segment to cache

			@param ctxt context.Context - execution context
			@param segment common.VideoSegmentWithData - video segment to cache
			@param ttl time.Duration - data retention in seconds before the entry expires
	*/
	CacheSegment(ctxt context.Context, segment common.VideoSegmentWithData, ttl time.Duration) error

	/*
		PurgeSegment delete video segment from cache

			@param ctxt context.Context - execution context
			@param segments []common.VideoSegment - list of segments to purge
	*/
	PurgeSegments(ctxt context.Context, segments []common.VideoSegment) error

	/*
		GetSegment fetch video segment from cache

			@param ctxt context.Context - execution context
			@param segment common.VideoSegment - segment to read
			@returns MPEG-TS file content
	*/
	GetSegment(ctxt context.Context, segment common.VideoSegment) ([]byte, error)

	/*
		GetSegments fetch group of video segments from cache. The returned entries are what is
		currently available within the cache.

			@param ctxt context.Context - execution context
			@param segments []common.VideoSegment - segments to read
			@returns set of MPEG-TS file content
	*/
	GetSegments(ctxt context.Context, segments []common.VideoSegment) (map[string][]byte, error)

	/*
		CacheEntryCount return the number of cached entries

			@param ctxt context.Context - execution context
			@returns the number of cached entries
	*/
	CacheEntryCount(ctxt context.Context) (int, error)
}

VideoSegmentCache video segment cache

func NewLocalVideoSegmentCache

func NewLocalVideoSegmentCache(
	parentContext context.Context,
	retentionCheckInterval time.Duration,
	metrics goutils.MetricsCollector,
) (VideoSegmentCache, error)

NewLocalVideoSegmentCache define new local in process single HLS source video segment cache

@param parentContext context.Context - parent context from which a worker context is defined
	for the data retention enforcement process
@param retentionCheckInterval time.Duration - cache entry retention enforce interval
@param metrics goutils.MetricsCollector - metrics framework client
@returns new VideoSegmentCache

func NewMemcachedVideoSegmentCache

func NewMemcachedVideoSegmentCache(
	ctxt context.Context, servers []string, metrics goutils.MetricsCollector,
) (VideoSegmentCache, error)

NewMemcachedVideoSegmentCache define new memcached video segment cache

@param ctxt context.Context - execution context
@param servers []string - list of memcached servers to connect to
@param metrics goutils.MetricsCollector - metrics framework client
@returns new VideoSegmentCache

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL