stream

package
v0.5.9-0...-ba7202b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2022 License: GPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HashSize     = 32
	BatchSize    = 64
	MinFrameSize = 16
)

Variables

View Source
var (

	// Protocol spec
	Spec = &protocols.Spec{
		Name:       "bzz-stream",
		Version:    8,
		MaxMsgSize: 10 * 1024 * 1024,
		Messages: []interface{}{
			StreamInfoReq{},
			StreamInfoRes{},
			GetRange{},
			OfferedHashes{},
			ChunkDelivery{},
			WantedHashes{},
		},
	}
)
View Source
var (
	SyncInitBackoff = 500 * time.Millisecond
)

Functions

This section is empty.

Types

type ChunkDelivery

type ChunkDelivery struct {
	Ruid   uint
	Chunks []DeliveredChunk
}

ChunkDelivery delivers a frame of chunks in response to a WantedHashes message

type DeliveredChunk

type DeliveredChunk struct {
	Addr storage.Address //chunk address
	Data []byte          //chunk data
}

DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message

type GetRange

type GetRange struct {
	Ruid      uint
	Stream    ID
	From      uint64
	To        *uint64 `rlp:"nil"`
	BatchSize uint
}

GetRange is a message sent from the downstream peer to the upstream peer asking for chunks within a particular interval for a certain stream

type ID

type ID struct {
	// Name is used for the Stream provider identification
	Name string
	// Key is the name of specific data stream within the stream provider. The semantics of this value
	// is at the discretion of the stream provider implementation
	Key string
}

Stream defines a unique stream identifier in a textual representation

func NewID

func NewID(name string, key string) ID

NewID returns a new Stream ID for a particular stream Name and Key

func (ID) String

func (s ID) String() string

String return a stream id based on all Stream fields.

type OfferedHashes

type OfferedHashes struct {
	Ruid      uint
	LastIndex uint64
	Hashes    []byte
}

OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter to selectively ask for chunks within a particular requested interval

type Peer

type Peer struct {
	*network.BzzPeer
	// contains filtered or unexported fields
}

Peer is the Peer extension for the streaming protocol

func (*Peer) InitProviders

func (p *Peer) InitProviders()

InitProviders initializes a provider for a certain peer

type PeerInfo

type PeerInfo struct {
	Base      string                       `json:"base"` // our node's base address
	Kademlia  string                       `json:"kademlia"`
	Peers     []PeerState                  `json:"peers"`
	Cursors   map[string]map[string]uint64 `json:"cursors"`
	Intervals map[string]string            `json:"intervals"`
}

PeerInfo holds information about the peer and it's peers.

type PeerState

type PeerState struct {
	Peer    string            `json:"peer"` // the peer address
	Cursors map[string]uint64 `json:"cursors"`
}

PeerState holds information about a connected peer.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is the base type that handles all client/server operations on a node it is instantiated once per stream protocol instance, that is, it should have one instance per node

func New

func New(intervalsStore state.Store, address *network.BzzAddr, providers ...StreamProvider) *Registry

New creates a new stream protocol handler

func (*Registry) APIs

func (r *Registry) APIs() []rpc.API

func (*Registry) HandleMsg

func (r *Registry) HandleMsg(p *Peer) func(context.Context, interface{}) error

HandleMsg is the main message handler for the stream protocol

func (*Registry) LastReceivedChunkTime

func (r *Registry) LastReceivedChunkTime() time.Time

LastReceivedChunkTime returns the time when the last chunk was received by syncing. This method is used in api.Inspector to detect when the syncing is complete.

func (*Registry) PeerInfo

func (r *Registry) PeerInfo() (*PeerInfo, error)

PeerInfo returns a response in which the queried node's peer cursors and intervals are returned

func (*Registry) Protocols

func (r *Registry) Protocols() []p2p.Protocol

func (*Registry) Run

func (r *Registry) Run(bp *network.BzzPeer) error

Run is being dispatched when 2 nodes connect

func (*Registry) Start

func (r *Registry) Start(server *p2p.Server) error

func (*Registry) Stop

func (r *Registry) Stop() error

type StreamDescriptor

type StreamDescriptor struct {
	Stream  ID
	Cursor  uint64
	Bounded bool
}

StreamDescriptor describes an arbitrary stream

type StreamInfoReq

type StreamInfoReq struct {
	Streams []ID
}

StreamInfoReq is a request to get information about particular streams

type StreamInfoRes

type StreamInfoRes struct {
	Streams []StreamDescriptor
}

StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors

type StreamProvider

type StreamProvider interface {

	// NeedData informs the caller whether a certain chunk needs to be fetched from another peer or not
	NeedData(ctx context.Context, addr ...chunk.Address) ([]bool, error)

	// Get a set of chunks identified by addr from the local storage
	Get(ctx context.Context, addr ...chunk.Address) ([]chunk.Chunk, error)

	// Put a set of chunks into the local storage
	Put(ctx context.Context, ch ...chunk.Chunk) (exists []bool, err error)

	// Set a set of chunks as synced in the localstore
	Set(ctx context.Context, addrs ...chunk.Address) error

	// Subscribe to a data stream from an arbitrary data source
	Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func())

	// Cursor returns the last known Cursor for a given Stream Key string
	Cursor(string) (uint64, error)

	// InitPeer is a provider specific implementation on how to maintain running streams with
	// an arbitrary Peer. This method should always be run in a separate goroutine
	InitPeer(p *Peer)

	// WantStream indicates if we are interested in a stream
	WantStream(*Peer, ID) bool

	// StreamName returns the Name of the Stream (see ID)
	StreamName() string

	// ParseStream from a standard pipe-separated string and return the Stream Key
	ParseKey(string) (interface{}, error)

	// EncodeStream from a Stream Key to a Stream pipe-separated string representation
	EncodeKey(interface{}) (string, error)

	// Autostart indicates if the stream should autostart
	Autostart() bool

	// Boundedness indicates if the stream is bounded or not
	Boundedness() bool

	// Close the provider
	Close()
}

StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable stream provider as part of the Stream! protocol specification. Since Stream! thoroughly defines the concepts of a stream, intervals, clients and servers, the interface therefore needs only a pluggable provider. The domain interpretable notions which are at the discretion of the implementing provider therefore are - sourcing data (get, put, subscribe for constant new data, and need data which is to decide whether to retrieve data or not), retrieving cursors from the data store, the implementation of which streams to maintain with a certain peer and providing functionality to expose, parse and encode values related to the string represntation of the stream

func NewSyncProvider

func NewSyncProvider(ns *storage.NetStore, kad *network.Kademlia, baseAddr *network.BzzAddr, autostart bool, syncOnlyWithinDepth bool) StreamProvider

NewSyncProvider creates a new sync provider that is used by the stream protocol to sink data and control its behaviour syncOnlyWithinDepth toggles stream establishment in reference to kademlia. When true - streams are established only within depth ( >=depth ). This is needed for Push Sync. When set to false, the streams are established on all bins as they did traditionally with Pull Sync.

type StreamState

type StreamState struct {
	Stream  ID
	Code    uint16
	Message string
}

StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state

type WantedHashes

type WantedHashes struct {
	Ruid      uint
	BitVector []byte
}

WantedHashes is a message sent from the downstream peer to the upstream peer in response to OfferedHashes in order to selectively ask for a particular chunks within an interval

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL