core

package
v0.5.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 35 Imported by: 0

Documentation

Overview

Core contains the main functionality of the Livepeer node.

The logical orgnization of the `core` module is as follows:

livepeernode.go: Main struct definition and code that is common to all node types. broadcaster.go: Code that is called only when the node is in broadcaster mode. orchestrator.go: Code that is called only when the node is in orchestrator mode.

Index

Constants

View Source
const (
	JsonPlaylistInitialTimeout = 5 * time.Second
	JsonPlaylistMaxTimeout     = 120 * time.Second
)
View Source
const (
	DefaultManifestIDLength = 4
)
View Source
const LIVE_LIST_LENGTH uint = 6

Variables

View Source
var CapabilityTestLookup = map[Capability]CapabilityTest{

	Capability_H264: {
		// contains filtered or unexported fields
	},
	Capability_HEVC_Decode: {
		// contains filtered or unexported fields
	},
	Capability_HEVC_Encode: {
		// contains filtered or unexported fields
	},
	Capability_VP8_Decode: {
		// contains filtered or unexported fields
	},
	Capability_VP9_Decode: {
		// contains filtered or unexported fields
	},
}
View Source
var DetectorProfile ffmpeg.DetectorProfile

This is for temporary convenience - as we currently only support loading a single detection model.

View Source
var ErrManifestID = errors.New("ErrManifestID")
View Source
var ErrNoCompatibleTranscodersAvailable = errors.New("no transcoders can provide requested capabilities")
View Source
var ErrNoTranscodersAvailable = errors.New("no transcoders available")
View Source
var ErrOrchBusy = errors.New("OrchestratorBusy")
View Source
var ErrOrchCap = errors.New("OrchestratorCapped")
View Source
var ErrRemoteTranscoderTimeout = errors.New("Remote transcoder took too long")
View Source
var ErrTranscode = errors.New("ErrTranscode")
View Source
var ErrTranscoderAvail = errors.New("ErrTranscoderUnavailable")
View Source
var ErrTranscoderBusy = errors.New("TranscoderBusy")
View Source
var ErrTranscoderStopped = errors.New("TranscoderStopped")
View Source
var JsonPlaylistQuitTimeout = 60 * time.Second
View Source
var LivepeerVersion = "undefined"

LivepeerVersion node version content of this constant will be set at build time, using -ldflags, combining content of `VERSION` file and output of the `git describe` command.

View Source
var MaxSessions = 10
View Source
var WorkDir string

Functions

func NetSegData added in v0.5.9

func NetSegData(md *SegTranscodingMetadata) (*net.SegData, error)

func NewBroadcaster added in v0.3.3

func NewBroadcaster(node *LivepeerNode) *broadcaster

func NewOrchestrator added in v0.3.3

func NewOrchestrator(n *LivepeerNode, rm common.RoundsManager) *orchestrator

func NewRemoteTranscoderFatalError added in v0.5.0

func NewRemoteTranscoderFatalError(err error) error

NewRemoteTranscoderFatalError creates new RemoteTranscoderFatalError Exported here to be used in other packages

Types

type AddressBalances added in v0.5.0

type AddressBalances struct {
	// contains filtered or unexported fields
}

AddressBalances holds credit balances for ETH addresses

func NewAddressBalances added in v0.5.0

func NewAddressBalances(ttl time.Duration) *AddressBalances

NewAddressBalances creates a new AddressBalances instance

func (*AddressBalances) Balance added in v0.5.0

func (a *AddressBalances) Balance(addr ethcommon.Address, id ManifestID) *big.Rat

Balance retrieves the current balance for an address' ManifestID

func (*AddressBalances) Credit added in v0.5.0

func (a *AddressBalances) Credit(addr ethcommon.Address, id ManifestID, amount *big.Rat)

Credit adds an an amount to the balance for an address' ManifestID

func (*AddressBalances) Debit added in v0.5.0

func (a *AddressBalances) Debit(addr ethcommon.Address, id ManifestID, amount *big.Rat)

Debit substracts an amount from the balance for an address' ManifestID

func (*AddressBalances) Reserve added in v0.5.0

func (a *AddressBalances) Reserve(addr ethcommon.Address, id ManifestID) *big.Rat

Reserve zeros the balance for an address' ManifestID and returns the current balance

func (*AddressBalances) StopCleanup added in v0.5.0

func (a *AddressBalances) StopCleanup()

StopCleanup stops the cleanup loop for all balances

type Balance added in v0.5.0

type Balance struct {
	// contains filtered or unexported fields
}

Balance holds the credit balance for a broadcast session

func NewBalance added in v0.5.0

func NewBalance(addr ethcommon.Address, manifestID ManifestID, balances *AddressBalances) *Balance

NewBalance returns a Balance instance

func (*Balance) Credit added in v0.5.0

func (b *Balance) Credit(amount *big.Rat)

Credit adds an amount to the balance

func (*Balance) StageUpdate added in v0.5.0

func (b *Balance) StageUpdate(minCredit, ev *big.Rat) (int, *big.Rat, *big.Rat)

StageUpdate prepares a balance update by reserving the current balance and returning the number of tickets to send with a payment, the new credit represented by the payment and the existing credit (i.e reserved balance)

type Balances added in v0.5.0

type Balances struct {
	// contains filtered or unexported fields
}

Balances holds credit balances on a per-stream basis

func NewBalances added in v0.5.0

func NewBalances(ttl time.Duration) *Balances

NewBalances creates a Balances instance with the given ttl

func (*Balances) Balance added in v0.5.0

func (b *Balances) Balance(id ManifestID) *big.Rat

Balance retrieves the current balance for a ManifestID

func (*Balances) Credit added in v0.5.0

func (b *Balances) Credit(id ManifestID, amount *big.Rat)

Credit adds an an amount to the balance for a ManifestID

func (*Balances) Debit added in v0.5.0

func (b *Balances) Debit(id ManifestID, amount *big.Rat)

Debit substracts an amount from the balance for a ManifestID

func (*Balances) Reserve added in v0.5.0

func (b *Balances) Reserve(id ManifestID) *big.Rat

Reserve zeros the balance for a ManifestID and returns the current balance

func (*Balances) StartCleanup added in v0.5.0

func (b *Balances) StartCleanup()

StartCleanup is a state flushing method to clean up the balances mapping

func (*Balances) StopCleanup added in v0.5.0

func (b *Balances) StopCleanup()

StopCleanup stops the cleanup loop for Balances

type BasicPlaylistManager added in v0.5.0

type BasicPlaylistManager struct {
	// contains filtered or unexported fields
}

func NewBasicPlaylistManager added in v0.5.0

func NewBasicPlaylistManager(manifestID ManifestID,
	storageSession, recordSession drivers.OSSession) *BasicPlaylistManager

NewBasicPlaylistManager create new BasicPlaylistManager struct

func (*BasicPlaylistManager) Cleanup added in v0.5.0

func (mgr *BasicPlaylistManager) Cleanup()

func (*BasicPlaylistManager) FlushRecord added in v0.5.13

func (mgr *BasicPlaylistManager) FlushRecord()

func (*BasicPlaylistManager) GetHLSMasterPlaylist added in v0.5.0

func (mgr *BasicPlaylistManager) GetHLSMasterPlaylist() *m3u8.MasterPlaylist

GetHLSMasterPlaylist ..

func (*BasicPlaylistManager) GetHLSMediaPlaylist added in v0.5.0

func (mgr *BasicPlaylistManager) GetHLSMediaPlaylist(rendition string) *m3u8.MediaPlaylist

GetHLSMediaPlaylist ...

func (*BasicPlaylistManager) GetOSSession added in v0.5.0

func (mgr *BasicPlaylistManager) GetOSSession() drivers.OSSession

func (*BasicPlaylistManager) GetRecordOSSession added in v0.5.13

func (mgr *BasicPlaylistManager) GetRecordOSSession() drivers.OSSession

func (*BasicPlaylistManager) InsertHLSSegment added in v0.5.0

func (mgr *BasicPlaylistManager) InsertHLSSegment(profile *ffmpeg.VideoProfile, seqNo uint64, uri string,
	duration float64) error

func (*BasicPlaylistManager) InsertHLSSegmentJSON added in v0.5.13

func (mgr *BasicPlaylistManager) InsertHLSSegmentJSON(profile *ffmpeg.VideoProfile, seqNo uint64, uri string,
	duration float64)

func (*BasicPlaylistManager) ManifestID added in v0.5.0

func (mgr *BasicPlaylistManager) ManifestID() ManifestID

type Capabilities added in v0.5.10

type Capabilities struct {
	// contains filtered or unexported fields
}

func CapabilitiesFromNetCapabilities added in v0.5.10

func CapabilitiesFromNetCapabilities(caps *net.Capabilities) *Capabilities

func JobCapabilities added in v0.5.10

func JobCapabilities(params *StreamParameters) (*Capabilities, error)

func NewCapabilities added in v0.5.10

func NewCapabilities(caps []Capability, m []Capability) *Capabilities

func (*Capabilities) CompatibleWith added in v0.5.10

func (bcast *Capabilities) CompatibleWith(orch *net.Capabilities) bool

func (*Capabilities) LegacyOnly added in v0.5.10

func (bcast *Capabilities) LegacyOnly() bool

func (*Capabilities) ToNetCapabilities added in v0.5.10

func (c *Capabilities) ToNetCapabilities() *net.Capabilities

type Capability added in v0.5.10

type Capability int
const (
	Capability_Invalid Capability = iota - 2
	Capability_Unused
	Capability_H264
	Capability_MPEGTS
	Capability_MP4
	Capability_FractionalFramerates
	Capability_StorageDirect
	Capability_StorageS3
	Capability_StorageGCS
	Capability_ProfileH264Baseline
	Capability_ProfileH264Main
	Capability_ProfileH264High
	Capability_ProfileH264ConstrainedHigh
	Capability_GOP
	Capability_AuthToken
	Capability_SceneClassification
	Capability_MPEG7VideoSignature
	Capability_HEVC_Decode
	Capability_HEVC_Encode
	Capability_VP8_Decode
	Capability_VP9_Decode
	Capability_VP8_Encode
	Capability_VP9_Encode
)

Do not rearrange these values! Only append.

func DefaultCapabilities added in v0.5.19

func DefaultCapabilities() []Capability

func ExperimentalCapabilities added in v0.5.19

func ExperimentalCapabilities() []Capability

func MandatoryOCapabilities added in v0.5.28

func MandatoryOCapabilities() []Capability

func OptionalCapabilities added in v0.5.28

func OptionalCapabilities() []Capability

func TestTranscoderCapabilities added in v0.5.28

func TestTranscoderCapabilities(devices []string) (caps []Capability, fatalError error)

Test which capabilities transcoder supports

type CapabilityString added in v0.5.10

type CapabilityString []uint64

func NewCapabilityString added in v0.5.10

func NewCapabilityString(caps []Capability) CapabilityString

func (CapabilityString) CompatibleWith added in v0.5.10

func (c1 CapabilityString) CompatibleWith(c2 CapabilityString) bool

type CapabilityTest added in v0.5.28

type CapabilityTest struct {
	// contains filtered or unexported fields
}

type Constraints added in v0.5.10

type Constraints struct{}

type DetectionConfig added in v0.5.19

type DetectionConfig struct {
	Freq               uint
	SelectedClassNames []string
	Profiles           []ffmpeg.DetectorProfile
}

type JsonMediaTrack added in v0.5.13

type JsonMediaTrack struct {
	Name       string `json:"name,omitempty"`
	Bandwidth  uint32 `json:"bandwidth,omitempty"`
	Resolution string `json:"resolution,omitempty"`
}

type JsonPlaylist added in v0.5.13

type JsonPlaylist struct {
	DurationMs uint64               `json:"duration_ms,omitempty"` // total duration of the saved sagments
	Tracks     []JsonMediaTrack     `json:"tracks,omitempty"`
	Segments   map[string][]jsonSeg `json:"segments,omitempty"`
	// contains filtered or unexported fields
}

func NewJSONPlaylist added in v0.5.13

func NewJSONPlaylist() *JsonPlaylist

func (*JsonPlaylist) AddDiscontinuedTrack added in v0.5.13

func (jpl *JsonPlaylist) AddDiscontinuedTrack(ajpl *JsonPlaylist, trackName string)

AddDiscontinuedTrack appends all segments for specified rendition, mark first one as discontinued

func (*JsonPlaylist) AddMaster added in v0.5.13

func (jpl *JsonPlaylist) AddMaster(ajpl *JsonPlaylist)

AddMaster adds data about tracks

func (*JsonPlaylist) AddSegmentsToMPL added in v0.5.13

func (jpl *JsonPlaylist) AddSegmentsToMPL(manifestIDs []string, trackName string, mpl *m3u8.MediaPlaylist, extURL string)

AddSegmentsToMPL adds segments to the MediaPlaylist

func (*JsonPlaylist) AddTrack added in v0.5.13

func (jpl *JsonPlaylist) AddTrack(ajpl *JsonPlaylist, trackName string)

AddTrack adds segments data for specified rendition

func (*JsonPlaylist) InsertHLSSegment added in v0.5.13

func (jpl *JsonPlaylist) InsertHLSSegment(profile *ffmpeg.VideoProfile, seqNo uint64, uri string,
	duration float64)

type LivepeerNode

type LivepeerNode struct {

	// Common fields
	Eth      eth.LivepeerEthClient
	WorkDir  string
	NodeType NodeType
	Database *common.DB

	// Transcoder public fields
	SegmentChans      map[ManifestID]SegmentChan
	Recipient         pm.Recipient
	OrchestratorPool  common.OrchestratorPool
	OrchSecret        string
	Transcoder        Transcoder
	TranscoderManager *RemoteTranscoderManager
	Balances          *AddressBalances
	Capabilities      *Capabilities
	AutoAdjustPrice   bool

	// Broadcaster public fields
	Sender pm.Sender
	// contains filtered or unexported fields
}

LivepeerNode handles videos going in and coming out of the Livepeer network.

func NewLivepeerNode

func NewLivepeerNode(e eth.LivepeerEthClient, wd string, dbh *common.DB) (*LivepeerNode, error)

NewLivepeerNode creates a new Livepeer Node. Eth can be nil.

func (*LivepeerNode) GetBasePrice added in v0.5.0

func (n *LivepeerNode) GetBasePrice() *big.Rat

GetBasePrice gets the base price for an orchestrator

func (*LivepeerNode) GetServiceURI added in v0.5.0

func (n *LivepeerNode) GetServiceURI() *url.URL

func (*LivepeerNode) SetBasePrice added in v0.5.0

func (n *LivepeerNode) SetBasePrice(price *big.Rat)

SetBasePrice sets the base price for an orchestrator on the node

func (*LivepeerNode) SetServiceURI added in v0.5.0

func (n *LivepeerNode) SetServiceURI(newUrl *url.URL)

type LoadBalancingTranscoder added in v0.5.2

type LoadBalancingTranscoder struct {
	// contains filtered or unexported fields
}

func (*LoadBalancingTranscoder) Transcode added in v0.5.2

type LocalTranscoder added in v0.5.0

type LocalTranscoder struct {
	// contains filtered or unexported fields
}

func (*LocalTranscoder) Transcode added in v0.5.0

func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error)

type ManifestID

type ManifestID string

func RandomManifestID added in v0.5.0

func RandomManifestID() ManifestID

type NodeType added in v0.3.3

type NodeType int
const (
	DefaultNode NodeType = iota
	BroadcasterNode
	OrchestratorNode
	TranscoderNode
	RedeemerNode
)

func (NodeType) String added in v0.5.20

func (t NodeType) String() string

type NvidiaTranscoder added in v0.5.0

type NvidiaTranscoder struct {
	// contains filtered or unexported fields
}

func (*NvidiaTranscoder) Stop added in v0.5.2

func (nv *NvidiaTranscoder) Stop()

func (*NvidiaTranscoder) Transcode added in v0.5.0

func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error)

type PlaylistManager added in v0.5.0

type PlaylistManager interface {
	ManifestID() ManifestID
	// Implicitly creates master and media playlists
	// Inserts in media playlist given a link to a segment
	InsertHLSSegment(profile *ffmpeg.VideoProfile, seqNo uint64, uri string, duration float64) error

	InsertHLSSegmentJSON(profile *ffmpeg.VideoProfile, seqNo uint64, uri string, duration float64)

	GetHLSMasterPlaylist() *m3u8.MasterPlaylist

	GetHLSMediaPlaylist(rendition string) *m3u8.MediaPlaylist

	GetOSSession() drivers.OSSession

	GetRecordOSSession() drivers.OSSession

	FlushRecord()

	Cleanup()
}

PlaylistManager manages playlists and data for one video stream, backed by one object storage.

type RemoteTranscoder added in v0.5.0

type RemoteTranscoder struct {
	// contains filtered or unexported fields
}

func NewRemoteTranscoder added in v0.5.0

func NewRemoteTranscoder(m *RemoteTranscoderManager, stream net.Transcoder_RegisterTranscoderServer, capacity int, caps *Capabilities) *RemoteTranscoder

func (*RemoteTranscoder) Transcode added in v0.5.0

Transcode do actual transcoding by sending work to remote transcoder and waiting for the result

type RemoteTranscoderFatalError added in v0.5.0

type RemoteTranscoderFatalError struct {
	// contains filtered or unexported fields
}

RemoteTranscoderFatalError wraps error to indicate that error is fatal

type RemoteTranscoderManager added in v0.5.0

type RemoteTranscoderManager struct {
	RTmutex sync.Mutex
	// contains filtered or unexported fields
}

func NewRemoteTranscoderManager added in v0.5.0

func NewRemoteTranscoderManager() *RemoteTranscoderManager

func (*RemoteTranscoderManager) Manage added in v0.5.0

func (rtm *RemoteTranscoderManager) Manage(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities)

Manage adds transcoder to list of live transcoders. Doesn't return until transcoder disconnects

func (*RemoteTranscoderManager) RegisteredTranscodersCount added in v0.5.0

func (rtm *RemoteTranscoderManager) RegisteredTranscodersCount() int

RegisteredTranscodersCount returns number of registered transcoders

func (*RemoteTranscoderManager) RegisteredTranscodersInfo added in v0.5.0

func (rtm *RemoteTranscoderManager) RegisteredTranscodersInfo() []common.RemoteTranscoderInfo

RegisteredTranscodersInfo returns list of restered transcoder's information

func (*RemoteTranscoderManager) Transcode added in v0.5.0

Transcode does actual transcoding using remote transcoder from the pool

type RemoteTranscoderResult added in v0.5.0

type RemoteTranscoderResult struct {
	TranscodeData *TranscodeData
	Err           error
}

type SegChanData added in v0.3.3

type SegChanData struct {
	// contains filtered or unexported fields
}

type SegTranscodingMetadata added in v0.5.0

type SegTranscodingMetadata struct {
	ManifestID         ManifestID
	Fname              string
	Seq                int64
	Hash               ethcommon.Hash
	Profiles           []ffmpeg.VideoProfile
	OS                 *net.OSInfo
	Duration           time.Duration
	Caps               *Capabilities
	AuthToken          *net.AuthToken
	DetectorEnabled    bool
	DetectorProfiles   []ffmpeg.DetectorProfile
	CalcPerceptualHash bool
}

func (*SegTranscodingMetadata) Flatten added in v0.5.0

func (md *SegTranscodingMetadata) Flatten() []byte

type SegmentChan added in v0.3.3

type SegmentChan chan *SegChanData

type StreamID

type StreamID struct {
	// Base playback ID that related renditions are grouped under
	ManifestID ManifestID

	// Specifies the stream variant: the HLS source, transcoding profile, etc.
	// Also used for RTMP: when unguessable,this can function as a stream key.
	Rendition string
}

The StreamID represents a particular variant of a stream.

func MakeStreamID

func MakeStreamID(mid ManifestID, profile *ffmpeg.VideoProfile) StreamID

func MakeStreamIDFromString added in v0.5.0

func MakeStreamIDFromString(mid string, rendition string) StreamID

func SplitStreamIDString added in v0.5.0

func SplitStreamIDString(str string) StreamID

func (StreamID) String

func (id StreamID) String() string

type StreamParameters added in v0.5.9

type StreamParameters struct {
	ManifestID       ManifestID
	ExternalStreamID string
	SessionID        string
	RtmpKey          string
	Profiles         []ffmpeg.VideoProfile
	Resolution       string
	Format           ffmpeg.Format
	OS               drivers.OSSession
	RecordOS         drivers.OSSession
	Capabilities     *Capabilities
	Detection        DetectionConfig
	VerificationFreq uint
	Nonce            uint64
	Codec            ffmpeg.VideoCodec
}

func (*StreamParameters) StreamID added in v0.5.9

func (s *StreamParameters) StreamID() string

type TranscodeData added in v0.5.0

type TranscodeData struct {
	Segments   []*TranscodedSegmentData
	Pixels     int64 // Decoded pixels
	Detections []ffmpeg.DetectData
}

TranscodeData contains the transcoding output for an input segment

type TranscodeResult added in v0.3.3

type TranscodeResult struct {
	Err           error
	Sig           []byte
	TranscodeData *TranscodeData
	OS            drivers.OSSession
}

type TranscodedSegmentData added in v0.5.0

type TranscodedSegmentData struct {
	Data   []byte
	PHash  []byte // Perceptual hash data (maybe nil)
	Pixels int64  // Encoded pixels
}

TranscodedSegmentData contains encoded data for a profile

type Transcoder added in v0.3.3

type Transcoder interface {
	Transcode(ctx context.Context, md *SegTranscodingMetadata) (*TranscodeData, error)
}

func NewLoadBalancingTranscoder added in v0.5.2

func NewLoadBalancingTranscoder(devices []string, newTranscoderFn newTranscoderFn,
	newTranscoderWithDetectorFn newTranscoderWithDetectorFn) Transcoder

func NewLocalTranscoder added in v0.5.0

func NewLocalTranscoder(workDir string) Transcoder

type TranscoderChan added in v0.5.0

type TranscoderChan chan *RemoteTranscoderResult

type TranscoderSession added in v0.5.2

type TranscoderSession interface {
	Transcoder
	Stop()
}

func NewNvidiaTranscoder added in v0.5.0

func NewNvidiaTranscoder(gpu string) TranscoderSession

func NewNvidiaTranscoderWithDetector added in v0.5.20

func NewNvidiaTranscoderWithDetector(detector ffmpeg.DetectorProfile, gpu string) (TranscoderSession, error)

type UnrecoverableError added in v0.5.23

type UnrecoverableError struct {
	// contains filtered or unexported fields
}

func NewUnrecoverableError added in v0.5.23

func NewUnrecoverableError(err error) UnrecoverableError

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL