basicnet

package module
v0.0.0-...-dc99854 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2018 License: MIT Imports: 36 Imported by: 0

README

Build Status

go-livepeer-basicnet

Implementation of the Livepeer VideoNetwork interface. Built with libp2p.

Installation

gx and gx-go are required.

go get -u github.com/whyrusleeping/gx
go get -u github.com/whyrusleeping/gx-go
gx-go get github.com/livepeer/go-livepeer-basicnet

Testing

To run a specific test: go test -run Reconnect to run TestReconnect from the basic_network_test.go file.

To run all the tests in a specific file: make network_node_test to run network_node_test.go

To run all tests: go test

Updating libp2p packages

For now this is an interative and error-prone process.

# Turn gx hashes into github.com import paths
gx-go uw
# Update package.json as needed (usually can just copy from go-ipfs)
gx-go rw
# Remove gx directory in GOPATH/src
rm -rf $GOPATH/src/gx # maybe back this up first!
gx install

Follow instructions on screen. If it fails due to a dependency or IPFS error: ignore, remove gx-workspace-update.json, and retry the command. Repeat until git diff looks somewhat correct. In particular, the package.json should be updated and the gx hashes replaced by Github paths.

Run gx-go rewrite to rewrite the gx hashes and then check with go test.

If compilation fails, and the errors seem to be clustered around version conflicts of a particular package, you might have to gx workspace-update that package manually to knock it down to a compatible version.

Good luck, have fun.

Documentation

Overview

The BasicVideoNetwork is a push-based streaming protocol. It works as follow:

  • When a video is broadcasted, it's stored at a local broadcaster
  • When a viewer wants to view a video, it sends a subscribe request to the network
  • The network routes the request towards the broadcast node via kademlia routing

Index

Constants

View Source
const ConnTimeout = 3 * time.Second
View Source
const DefaultBroadcasterBufferSegSendInterval = time.Second
View Source
const DefaultBroadcasterBufferSize = 3
View Source
const DefaultTranscodeResponseRelayDuplication = 2
View Source
const RelayGCTime = 60 * time.Second
View Source
const RelayTicker = 10 * time.Second

Variables

View Source
var ConnFileWriteFreq = time.Duration(60) * time.Second
View Source
var ErrGetMasterPlaylist = errors.New("ErrGetMasterPlaylist")
View Source
var ErrHandleMsg = errors.New("ErrHandleMsg")
View Source
var ErrNoClosePeers = errors.New("NoClosePeers")
View Source
var ErrOutStream = errors.New("ErrOutStream")
View Source
var ErrProtocol = errors.New("ProtocolError")
View Source
var ErrSendMsg = errors.New("ErrSendMsg")
View Source
var ErrSubscriber = errors.New("ErrSubscriber")
View Source
var ErrTranscodeResponse = errors.New("TranscodeResponseError")
View Source
var ErrUnknownMsg = errors.New("UnknownMsgType")
View Source
var GetMasterPlaylistRelayWait = 10 * time.Second
View Source
var GetResponseWithRelayWait = 10 * time.Second
View Source
var InsertDataWaitTime = time.Second * 10
View Source
var MsgSentTimeCacheDuration = 2 * time.Second
View Source
var Protocol = protocol.ID("/livepeer_video/0.0.1")
View Source
var SubscriberDataInsertTimeout = time.Second * 300
View Source
var TranscodeSubTimeout = time.Second * 60

Functions

func SetTranscodeSubTimeout

func SetTranscodeSubTimeout(to time.Duration)

Types

type BasicBroadcaster

type BasicBroadcaster struct {
	Network *BasicVideoNetwork

	StrmID string
	// contains filtered or unexported fields
}

BasicBroadcaster is unique for a specific video stream. It keeps track of a list of listeners and a queue of video chunks. It won't start keeping track of things until there is at least 1 listener.

func (*BasicBroadcaster) AddListeningPeer

func (br *BasicBroadcaster) AddListeningPeer(nw *BasicVideoNetwork, pid peer.ID)

func (*BasicBroadcaster) AddListeningStream

func (br *BasicBroadcaster) AddListeningStream(key string, os OutStream)

func (*BasicBroadcaster) Broadcast

func (b *BasicBroadcaster) Broadcast(seqNo uint64, data []byte) error

Broadcast sends a video chunk to the stream. The very first call to Broadcast kicks off a worker routine to do the broadcasting.

func (*BasicBroadcaster) Finish

func (b *BasicBroadcaster) Finish() error

Finish signals the stream is finished. It cancels the broadcasting worker routine and sends the Finish message to all the listeners.

func (*BasicBroadcaster) IsLive

func (b *BasicBroadcaster) IsLive() bool

func (BasicBroadcaster) String

func (b BasicBroadcaster) String() string

type BasicInStream

type BasicInStream struct {
	Stream net.Stream
	// contains filtered or unexported fields
}

BasicStream is a libp2p stream wrapped in a reader and a writer.

func NewBasicInStream

func NewBasicInStream(s net.Stream) *BasicInStream

NewBasicStream creates a stream from a libp2p raw stream.

func (*BasicInStream) ReceiveMessage

func (bs *BasicInStream) ReceiveMessage() (Msg, error)

ReceiveMessage takes a message off the stream.

type BasicNetworkNode

type BasicNetworkNode struct {
	Identity peer.ID // the local node's identity
	Kad      *kad.IpfsDHT
	PeerHost host.Host // the network host (server+client)
	Network  *BasicVideoNetwork
	// contains filtered or unexported fields
}

func NewNode

func NewNode(listenAddrs []ma.Multiaddr, priv crypto.PrivKey, pub crypto.PubKey, f *BasicNotifiee) (*BasicNetworkNode, error)

NewNode creates a new Livepeerd node.

func (*BasicNetworkNode) AddPeer

func (n *BasicNetworkNode) AddPeer(pi peerstore.PeerInfo, ttl time.Duration)

func (*BasicNetworkNode) ClosestLocalPeers

func (bn *BasicNetworkNode) ClosestLocalPeers(strmID string) ([]peer.ID, error)

func (*BasicNetworkNode) Connect

func (*BasicNetworkNode) GetDHT

func (n *BasicNetworkNode) GetDHT() *kad.IpfsDHT

func (*BasicNetworkNode) GetOutStream

func (n *BasicNetworkNode) GetOutStream(pid peer.ID) *BasicOutStream

func (*BasicNetworkNode) GetPeerInfo

func (n *BasicNetworkNode) GetPeerInfo(p peer.ID) peerstore.PeerInfo

func (*BasicNetworkNode) GetPeers

func (n *BasicNetworkNode) GetPeers() []peer.ID

func (*BasicNetworkNode) Host

func (n *BasicNetworkNode) Host() host.Host

func (*BasicNetworkNode) ID

func (n *BasicNetworkNode) ID() peer.ID

func (*BasicNetworkNode) RefreshOutStream

func (n *BasicNetworkNode) RefreshOutStream(pid peer.ID) *BasicOutStream

func (*BasicNetworkNode) RemovePeer

func (n *BasicNetworkNode) RemovePeer(id peer.ID)

func (*BasicNetworkNode) RemoveStream

func (n *BasicNetworkNode) RemoveStream(pid peer.ID)

func (*BasicNetworkNode) SetSignFun

func (n *BasicNetworkNode) SetSignFun(sign func(data []byte) ([]byte, error))

func (*BasicNetworkNode) SetStreamHandler

func (n *BasicNetworkNode) SetStreamHandler(pid protocol.ID, handler inet.StreamHandler)

func (*BasicNetworkNode) SetVerifyTranscoderSig

func (n *BasicNetworkNode) SetVerifyTranscoderSig(verify func(data []byte, sig []byte, strmID string) bool)

func (*BasicNetworkNode) Sign

func (n *BasicNetworkNode) Sign(data []byte) ([]byte, error)

func (*BasicNetworkNode) VerifyTranscoderSig

func (n *BasicNetworkNode) VerifyTranscoderSig(data []byte, sig []byte, strmID string) bool

type BasicNotifiee

type BasicNotifiee struct {
	// contains filtered or unexported fields
}

BasicNotifiee gets called during important libp2p events

func NewBasicNotifiee

func NewBasicNotifiee(mon *lpmon.Monitor) *BasicNotifiee

func (*BasicNotifiee) ClosedStream

func (bn *BasicNotifiee) ClosedStream(n net.Network, s net.Stream)

called when a stream closed

func (*BasicNotifiee) Connected

func (bn *BasicNotifiee) Connected(n net.Network, conn net.Conn)

called when a connection opened

func (*BasicNotifiee) Disconnected

func (bn *BasicNotifiee) Disconnected(n net.Network, conn net.Conn)

called when a connection closed

func (*BasicNotifiee) HandleDisconnect

func (bn *BasicNotifiee) HandleDisconnect(h func(pid peer.ID))

func (*BasicNotifiee) Listen

func (bn *BasicNotifiee) Listen(n net.Network, addr ma.Multiaddr)

called when network starts listening on an addr

func (*BasicNotifiee) ListenClose

func (bn *BasicNotifiee) ListenClose(n net.Network, addr ma.Multiaddr)

called when network starts listening on an addr

func (*BasicNotifiee) OpenedStream

func (bn *BasicNotifiee) OpenedStream(n net.Network, s net.Stream)

called when a stream opened

type BasicOutStream

type BasicOutStream struct {
	Stream net.Stream
	// contains filtered or unexported fields
}

BasicStream is a libp2p stream wrapped in a reader and a writer.

func NewBasicOutStream

func NewBasicOutStream(s net.Stream) *BasicOutStream

NewBasicStream creates a stream from a libp2p raw stream.

func (*BasicOutStream) GetRemotePeer

func (bs *BasicOutStream) GetRemotePeer() peer.ID

func (*BasicOutStream) SendMessage

func (bs *BasicOutStream) SendMessage(opCode Opcode, data interface{}) error

SendMessage writes a message into the stream.

type BasicRelayer

type BasicRelayer struct {
	Network      *BasicVideoNetwork
	UpstreamPeer peer.ID

	LastRelay time.Time
	// contains filtered or unexported fields
}

BasicRelayer relays video segments to listners. Unlike BasicBroadcaster, BasicRelayer is does NOT have a worker - it sends out the chunks to its listeners as soon as it gets one from the network.

func (*BasicRelayer) AddListener

func (br *BasicRelayer) AddListener(nw *BasicVideoNetwork, pid peer.ID)

func (*BasicRelayer) RelayFinishStream

func (br *BasicRelayer) RelayFinishStream(nw *BasicVideoNetwork, fs FinishStreamMsg) error

func (*BasicRelayer) RelayMasterPlaylistData

func (br *BasicRelayer) RelayMasterPlaylistData(nw *BasicVideoNetwork, mpld MasterPlaylistDataMsg) error

func (*BasicRelayer) RelayNodeStatusData

func (br *BasicRelayer) RelayNodeStatusData(nw *BasicVideoNetwork, nsd NodeStatusDataMsg) error

func (*BasicRelayer) RelayStreamData

func (br *BasicRelayer) RelayStreamData(sd *StreamDataMsg) error

RelayStreamData sends a StreamDataMsg to its listeners

func (BasicRelayer) String

func (br BasicRelayer) String() string

type BasicReporter

type BasicReporter struct{}

func (*BasicReporter) GetBandwidthForPeer

func (br *BasicReporter) GetBandwidthForPeer(peer.ID) metrics.Stats

func (*BasicReporter) GetBandwidthForProtocol

func (br *BasicReporter) GetBandwidthForProtocol(protocol.ID) metrics.Stats

func (*BasicReporter) GetBandwidthTotals

func (br *BasicReporter) GetBandwidthTotals() metrics.Stats

func (*BasicReporter) LogRecvMessage

func (br *BasicReporter) LogRecvMessage(num int64)

func (*BasicReporter) LogRecvMessageStream

func (br *BasicReporter) LogRecvMessageStream(num int64, prot protocol.ID, p peer.ID)

func (*BasicReporter) LogSentMessage

func (br *BasicReporter) LogSentMessage(num int64)

func (*BasicReporter) LogSentMessageStream

func (br *BasicReporter) LogSentMessageStream(num int64, prot protocol.ID, p peer.ID)

type BasicSubscriber

type BasicSubscriber struct {
	Network *BasicVideoNetwork

	// networkStream *BasicStream
	StrmID       string
	UpstreamPeer peer.ID
	// contains filtered or unexported fields
}

BasicSubscriber keeps track of

func (*BasicSubscriber) ClosedStream

func (s *BasicSubscriber) ClosedStream(n inet.Network, st inet.Stream)

func (*BasicSubscriber) Connected

func (s *BasicSubscriber) Connected(n inet.Network, conn inet.Conn)

func (*BasicSubscriber) Disconnected

func (s *BasicSubscriber) Disconnected(n inet.Network, conn inet.Conn)

func (*BasicSubscriber) HandleConnection

func (s *BasicSubscriber) HandleConnection(conn inet.Conn)

Notifiee

func (*BasicSubscriber) InsertData

func (s *BasicSubscriber) InsertData(sd *StreamDataMsg) error

func (*BasicSubscriber) IsLive

func (s *BasicSubscriber) IsLive() bool

func (*BasicSubscriber) Listen

func (s *BasicSubscriber) Listen(n inet.Network, m ma.Multiaddr)

func (*BasicSubscriber) ListenClose

func (s *BasicSubscriber) ListenClose(n inet.Network, m ma.Multiaddr)

func (*BasicSubscriber) OpenedStream

func (s *BasicSubscriber) OpenedStream(n inet.Network, st inet.Stream)

func (BasicSubscriber) String

func (s BasicSubscriber) String() string

func (*BasicSubscriber) Subscribe

func (s *BasicSubscriber) Subscribe(ctx context.Context, gotData func(seqNo uint64, data []byte, eof bool)) error

Subscribe kicks off a go routine that calls the gotData func for every new video chunk

func (*BasicSubscriber) TranscoderSubscribe

func (s *BasicSubscriber) TranscoderSubscribe(ctx context.Context, gotData func(seqNo uint64, data []byte, eof bool)) error

func (*BasicSubscriber) Unsubscribe

func (s *BasicSubscriber) Unsubscribe() error

Unsubscribe unsubscribes from the broadcast

type BasicVideoNetwork

type BasicVideoNetwork struct {
	NetworkNode NetworkNode
	// contains filtered or unexported fields
}

BasicVideoNetwork implements the VideoNetwork interface. It creates a kademlia network using libp2p. It does push-based video delivery, and handles the protocol in the background.

func NewBasicVideoNetwork

func NewBasicVideoNetwork(n *BasicNetworkNode, publicIP string, port int) (*BasicVideoNetwork, error)

NewBasicVideoNetwork creates a libp2p node, handle the basic (push-based) video protocol.

func (*BasicVideoNetwork) Connect

func (n *BasicVideoNetwork) Connect(nodeID string, addrs []string) error

Connect connects a node to the Livepeer network.

func (*BasicVideoNetwork) GetBroadcaster

func (n *BasicVideoNetwork) GetBroadcaster(strmID string) (stream.Broadcaster, error)

GetBroadcaster gets a broadcaster for a streamID. If it doesn't exist, create a new one.

func (*BasicVideoNetwork) GetLocalStreams

func (n *BasicVideoNetwork) GetLocalStreams() []string

func (*BasicVideoNetwork) GetMasterPlaylist

func (n *BasicVideoNetwork) GetMasterPlaylist(p string, manifestID string) (chan *m3u8.MasterPlaylist, error)

GetMasterPlaylist issues a request to the broadcaster for the MasterPlaylist and returns the channel to the playlist. The broadcaster should send the response back as soon as it gets the request.

func (*BasicVideoNetwork) GetNodeID

func (n *BasicVideoNetwork) GetNodeID() string

GetNodeID gets the node id

func (*BasicVideoNetwork) GetNodeStatus

func (n *BasicVideoNetwork) GetNodeStatus(nodeID string) (chan *lpnet.NodeStatus, error)

func (*BasicVideoNetwork) GetSubscriber

func (n *BasicVideoNetwork) GetSubscriber(strmID string) (stream.Subscriber, error)

GetSubscriber gets a subscriber for a streamID. If it doesn't exist, create a new one.

func (*BasicVideoNetwork) NewRelayer

func (n *BasicVideoNetwork) NewRelayer(strmID string, opcode Opcode) *BasicRelayer

NewRelayer creates a new relayer.

func (*BasicVideoNetwork) Ping

func (n *BasicVideoNetwork) Ping(nid string) (chan struct{}, error)

func (*BasicVideoNetwork) ReceivedTranscodeResponse

func (n *BasicVideoNetwork) ReceivedTranscodeResponse(strmID string, gotResult func(transcodeResult map[string]string))

ReceivedTranscodeResponse sends a request and registers the callback for when the broadcaster receives transcode results.

func (*BasicVideoNetwork) SendTranscodeResponse

func (n *BasicVideoNetwork) SendTranscodeResponse(broadcaster string, strmID string, transcodedVideos map[string]string) error

SendTranscodeResponse sends the transcode result to the broadcast node.

func (*BasicVideoNetwork) SetBroadcaster

func (n *BasicVideoNetwork) SetBroadcaster(strmID string, b *BasicBroadcaster)

func (*BasicVideoNetwork) SetSubscriber

func (n *BasicVideoNetwork) SetSubscriber(strmID string, s *BasicSubscriber)

func (*BasicVideoNetwork) SetupProtocol

func (n *BasicVideoNetwork) SetupProtocol() error

SetupProtocol sets up the protocol so we can handle incoming messages

func (*BasicVideoNetwork) String

func (n *BasicVideoNetwork) String() string

func (*BasicVideoNetwork) TranscodeSub

func (n *BasicVideoNetwork) TranscodeSub(ctx context.Context, strmID string, gotData func(seqNo uint64, data []byte, eof bool)) error

SendTranscodeSub requests a transcoder subscription from the broadcast node.

func (*BasicVideoNetwork) UpdateMasterPlaylist

func (n *BasicVideoNetwork) UpdateMasterPlaylist(strmID string, mpl *m3u8.MasterPlaylist) error

UpdateMasterPlaylist updates the copy of the master playlist so any node can request it.

type CancelSubMsg

type CancelSubMsg struct {
	StrmID string
}

type FinishStreamMsg

type FinishStreamMsg struct {
	StrmID string
}

type GetMasterPlaylistReqMsg

type GetMasterPlaylistReqMsg struct {
	ManifestID string
}

type InStream

type InStream interface {
	ReceiveMessage() (Msg, error)
}

type LocalOutStream

type LocalOutStream struct {
	// contains filtered or unexported fields
}

func NewLocalOutStream

func NewLocalOutStream(s *BasicSubscriber) *LocalOutStream

func (*LocalOutStream) GetRemotePeer

func (bs *LocalOutStream) GetRemotePeer() peer.ID

func (*LocalOutStream) SendMessage

func (bs *LocalOutStream) SendMessage(opCode Opcode, data interface{}) error

type MasterPlaylistDataMsg

type MasterPlaylistDataMsg struct {
	ManifestID string
	MPL        string
	NotFound   bool
}

type Msg

type Msg struct {
	Op   Opcode
	Data interface{}
}

func (Msg) MarshalJSON

func (m Msg) MarshalJSON() ([]byte, error)

func (*Msg) UnmarshalJSON

func (m *Msg) UnmarshalJSON(b []byte) error

type NetworkNode

type NetworkNode interface {
	ID() peer.ID
	Host() host.Host
	GetOutStream(pid peer.ID) *BasicOutStream
	RefreshOutStream(pid peer.ID) *BasicOutStream
	RemoveStream(pid peer.ID)
	GetPeers() []peer.ID
	GetPeerInfo(peer.ID) peerstore.PeerInfo
	Connect(context.Context, peerstore.PeerInfo) error
	AddPeer(peerstore.PeerInfo, time.Duration)
	RemovePeer(peer.ID)
	ClosestLocalPeers(strmID string) ([]peer.ID, error)
	GetDHT() *kad.IpfsDHT
	SetStreamHandler(pid protocol.ID, handler inet.StreamHandler)
	SetSignFun(sign func(data []byte) ([]byte, error))
	Sign(data []byte) ([]byte, error)
	SetVerifyTranscoderSig(verify func(data []byte, sig []byte, strmID string) bool)
	VerifyTranscoderSig(data []byte, sig []byte, strmID string) bool
}

type NodeStatusDataMsg

type NodeStatusDataMsg struct {
	NodeID   string
	Data     []byte
	NotFound bool
}

type NodeStatusReqMsg

type NodeStatusReqMsg struct {
	NodeID string
}

type Opcode

type Opcode uint8
const (
	StreamDataID Opcode = iota
	FinishStreamID
	SubReqID
	CancelSubID
	TranscodeResponseID
	TranscodeSubID
	GetMasterPlaylistReqID
	MasterPlaylistDataID
	NodeStatusReqID
	NodeStatusDataID
	PingID
	PongID
	SimpleString
)

type OutStream

type OutStream interface {
	GetRemotePeer() peer.ID
	SendMessage(opCode Opcode, data interface{}) error
}

type PeerCache

type PeerCache struct {
	Peerstore peerstore.Peerstore
	Filename  string
}

func NewPeerCache

func NewPeerCache(peerStore peerstore.Peerstore, filename string) *PeerCache

func (*PeerCache) LoadPeers

func (pc *PeerCache) LoadPeers() []peerstore.PeerInfo

LoadPeers Load peer info from a file and try to connect to them

func (*PeerCache) Record

func (pc *PeerCache) Record(ctx context.Context)

Record Periodically write peers to a file

type PingDataMsg

type PingDataMsg string

type PongDataMsg

type PongDataMsg string

type StreamDataMsg

type StreamDataMsg struct {
	SeqNo  uint64
	StrmID string
	Data   []byte
}

type SubReqMsg

type SubReqMsg struct {
	StrmID string
}

type TranscodeResponseMsg

type TranscodeResponseMsg struct {
	//map of streamid -> video description
	StrmID string
	Result map[string]string
}

type TranscodeSubMsg

type TranscodeSubMsg struct {
	MultiAddrs []ma.Multiaddr
	NodeID     peer.ID
	StrmID     string
	Sig        []byte
}

func (TranscodeSubMsg) BytesForSigning

func (ts TranscodeSubMsg) BytesForSigning() []byte

func (*TranscodeSubMsg) GobDecode

func (ts *TranscodeSubMsg) GobDecode(data []byte) error

func (TranscodeSubMsg) GobEncode

func (ts TranscodeSubMsg) GobEncode() ([]byte, error)

type TranscodeSubMsg_b

type TranscodeSubMsg_b struct {
	MultiAddrs [][]byte
	NodeID     []byte
	StrmID     string
	Sig        []byte
}

struct that can be handled by gob; multiaddr can't

type VideoMuxer

type VideoMuxer interface {
	WriteSegment(seqNo uint64, strmID string, data []byte) error
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL