Documentation ¶
Index ¶
- Constants
- Variables
- type ChunkDelivery
- type DeliveredChunk
- type GetRange
- type ID
- type OfferedHashes
- type Peer
- type PeerInfo
- type PeerState
- type Registry
- func (r *Registry) APIs() []rpc.API
- func (r *Registry) HandleMsg(p *Peer) func(context.Context, interface{}) error
- func (r *Registry) LastReceivedChunkTime() time.Time
- func (r *Registry) PeerInfo() (*PeerInfo, error)
- func (r *Registry) Protocols() []p2p.Protocol
- func (r *Registry) Run(bp *network.BzzPeer) error
- func (r *Registry) Start(server *p2p.Server) error
- func (r *Registry) Stop() error
- type StreamDescriptor
- type StreamInfoReq
- type StreamInfoRes
- type StreamProvider
- type StreamState
- type WantedHashes
Constants ¶
const ( HashSize = 32 BatchSize = 64 MinFrameSize = 16 )
Variables ¶
var ( // Protocol spec Spec = &protocols.Spec{ Name: "bzz-stream", Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ StreamInfoReq{}, StreamInfoRes{}, GetRange{}, OfferedHashes{}, ChunkDelivery{}, WantedHashes{}, }, } )
var (
SyncInitBackoff = 500 * time.Millisecond
)
Functions ¶
This section is empty.
Types ¶
type ChunkDelivery ¶
type ChunkDelivery struct { Ruid uint Chunks []DeliveredChunk }
ChunkDelivery delivers a frame of chunks in response to a WantedHashes message
type DeliveredChunk ¶
DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message
type GetRange ¶
GetRange is a message sent from the downstream peer to the upstream peer asking for chunks within a particular interval for a certain stream
type ID ¶
type ID struct { // Name is used for the Stream provider identification Name string // Key is the name of specific data stream within the stream provider. The semantics of this value // is at the discretion of the stream provider implementation Key string }
Stream defines a unique stream identifier in a textual representation
type OfferedHashes ¶
OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter to selectively ask for chunks within a particular requested interval
type Peer ¶
Peer is the Peer extension for the streaming protocol
func (*Peer) InitProviders ¶
func (p *Peer) InitProviders()
InitProviders initializes a provider for a certain peer
type PeerInfo ¶
type PeerInfo struct { Base string `json:"base"` // our node's base address Kademlia string `json:"kademlia"` Peers []PeerState `json:"peers"` Cursors map[string]map[string]uint64 `json:"cursors"` Intervals map[string]string `json:"intervals"` }
PeerInfo holds information about the peer and it's peers.
type PeerState ¶
type PeerState struct { Peer string `json:"peer"` // the peer address Cursors map[string]uint64 `json:"cursors"` }
PeerState holds information about a connected peer.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is the base type that handles all client/server operations on a node it is instantiated once per stream protocol instance, that is, it should have one instance per node
func New ¶
func New(intervalsStore state.Store, address *network.BzzAddr, providers ...StreamProvider) *Registry
New creates a new stream protocol handler
func (*Registry) LastReceivedChunkTime ¶
LastReceivedChunkTime returns the time when the last chunk was received by syncing. This method is used in api.Inspector to detect when the syncing is complete.
func (*Registry) PeerInfo ¶
PeerInfo returns a response in which the queried node's peer cursors and intervals are returned
type StreamDescriptor ¶
StreamDescriptor describes an arbitrary stream
type StreamInfoReq ¶
type StreamInfoReq struct {
Streams []ID
}
StreamInfoReq is a request to get information about particular streams
type StreamInfoRes ¶
type StreamInfoRes struct {
Streams []StreamDescriptor
}
StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors
type StreamProvider ¶
type StreamProvider interface { // NeedData informs the caller whether a certain chunk needs to be fetched from another peer or not NeedData(ctx context.Context, addr ...chunk.Address) ([]bool, error) // Get a set of chunks identified by addr from the local storage Get(ctx context.Context, addr ...chunk.Address) ([]chunk.Chunk, error) // Put a set of chunks into the local storage Put(ctx context.Context, ch ...chunk.Chunk) (exists []bool, err error) // Set a set of chunks as synced in the localstore Set(ctx context.Context, addrs ...chunk.Address) error // Subscribe to a data stream from an arbitrary data source Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func()) // Cursor returns the last known Cursor for a given Stream Key string Cursor(string) (uint64, error) // InitPeer is a provider specific implementation on how to maintain running streams with // an arbitrary Peer. This method should always be run in a separate goroutine InitPeer(p *Peer) // WantStream indicates if we are interested in a stream WantStream(*Peer, ID) bool // StreamName returns the Name of the Stream (see ID) StreamName() string // ParseStream from a standard pipe-separated string and return the Stream Key ParseKey(string) (interface{}, error) // EncodeStream from a Stream Key to a Stream pipe-separated string representation EncodeKey(interface{}) (string, error) // Autostart indicates if the stream should autostart Autostart() bool // Boundedness indicates if the stream is bounded or not Boundedness() bool // Close the provider Close() }
StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable stream provider as part of the Stream! protocol specification. Since Stream! thoroughly defines the concepts of a stream, intervals, clients and servers, the interface therefore needs only a pluggable provider. The domain interpretable notions which are at the discretion of the implementing provider therefore are - sourcing data (get, put, subscribe for constant new data, and need data which is to decide whether to retrieve data or not), retrieving cursors from the data store, the implementation of which streams to maintain with a certain peer and providing functionality to expose, parse and encode values related to the string represntation of the stream
func NewSyncProvider ¶
func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, baseAddr *network.BzzAddr, autostart bool, syncOnlyWithinDepth bool) StreamProvider
NewSyncProvider creates a new sync provider that is used by the stream protocol to sink data and control its behaviour syncOnlyWithinDepth toggles stream establishment in reference to kademlia. When true - streams are established only within depth ( >=depth ). This is needed for Push Sync. When set to false, the streams are established on all bins as they did traditionally with Pull Sync.
type StreamState ¶
StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state
type WantedHashes ¶
WantedHashes is a message sent from the downstream peer to the upstream peer in response to OfferedHashes in order to selectively ask for a particular chunks within an interval