newstream

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2019 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HashSize  = 32
	BatchSize = 16
)

Variables

View Source
var ErrEmptyBatch = errors.New("empty batch")
View Source
var SyncerSpec = &protocols.Spec{
	Name:       "bzz-stream",
	Version:    8,
	MaxMsgSize: 10 * 1024 * 1024,
	Messages: []interface{}{
		StreamInfoReq{},
		StreamInfoRes{},
		GetRange{},
		OfferedHashes{},
		ChunkDelivery{},
		WantedHashes{},
	},
}

Functions

This section is empty.

Types

type API

type API struct {
	*SlipStream
}

Additional public methods accessible through API for pss

func NewAPI

func NewAPI(s *SlipStream) *API

type ChunkDelivery

type ChunkDelivery struct {
	Ruid      uint
	LastIndex uint
	Chunks    []DeliveredChunk
}

ChunkDelivery delivers a frame of chunks in response to a WantedHashes message

type DeliveredChunk

type DeliveredChunk struct {
	Addr storage.Address //chunk address
	Data []byte          //chunk data
}

DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message

type GetRange

type GetRange struct {
	Ruid      uint
	Stream    ID
	From      uint64
	To        uint64 `rlp:nil`
	BatchSize uint
	Roundtrip bool
}

GetRange is a message sent from the downstream peer to the upstream peer asking for chunks within a particular interval for a certain stream

type ID

type ID struct {
	// Name is used for the Stream provider identification
	Name string
	// Key is the name of specific data stream within the stream provider. The semantics of this value
	// is at the discretion of the stream provider implementation
	Key string
}

Stream defines a unique stream identifier in a textual representation

func NewID

func NewID(name string, key string) ID

NewID returns a new Stream ID for a particular stream Name and Key

func (ID) String

func (s ID) String() string

String return a stream id based on all Stream fields.

type OfferedHashes

type OfferedHashes struct {
	Ruid      uint
	LastIndex uint
	Hashes    []byte
}

OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter to selectively ask for chunks within a particular requested interval

type Peer

type Peer struct {
	*network.BzzPeer
	// contains filtered or unexported fields
}

Peer is the Peer extension for the streaming protocol

func NewPeer

func NewPeer(peer *network.BzzPeer, i state.Store, providers map[string]StreamProvider) *Peer

NewPeer is the constructor for Peer

func (*Peer) HandleMsg

func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error

HandleMsg is the message handler that delegates incoming messages

func (*Peer) Left

func (p *Peer) Left()

type SlipStream

type SlipStream struct {
	// contains filtered or unexported fields
}

SlipStream is the base type that handles all client/server operations on a node it is instantiated once per stream protocol instance, that is, it should have one instance per node

func NewSlipStream

func NewSlipStream(intervalsStore state.Store, kad *network.Kademlia, providers ...StreamProvider) *SlipStream

func (*SlipStream) APIs

func (s *SlipStream) APIs() []rpc.API

func (*SlipStream) Protocols

func (s *SlipStream) Protocols() []p2p.Protocol

func (*SlipStream) Run

func (s *SlipStream) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error

Run is being dispatched when 2 nodes connect

func (*SlipStream) Start

func (s *SlipStream) Start(server *p2p.Server) error

func (*SlipStream) Stop

func (s *SlipStream) Stop() error

type StreamDescriptor

type StreamDescriptor struct {
	Stream  ID
	Cursor  uint64
	Bounded bool
}

StreamDescriptor describes an arbitrary stream

type StreamInfoReq

type StreamInfoReq struct {
	Streams []ID
}

StreamInfoReq is a request to get information about particular streams

type StreamInfoRes

type StreamInfoRes struct {
	Streams []StreamDescriptor
}

StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors

type StreamInitBehavior

type StreamInitBehavior int

StreamInitBehavior defines the stream behavior upon init

const (
	// StreamIdle means that there is no initial automatic message exchange
	// between the nodes when the protocol gets established
	StreamIdle StreamInitBehavior = iota

	// StreamGetCursors tells the two nodes to automatically fetch stream
	// cursors from each other
	StreamGetCursors

	// StreamAutostart automatically starts fetching data from the streams
	// once the cursors arrive
	StreamAutostart
)

type StreamProvider

type StreamProvider interface {

	// NeedData informs the caller whether a certain chunk needs to be fetched from another peer or not.
	// Typically this will involve checking whether a certain chunk exists locally.
	// In case a chunk does not exist locally - a `wait` function returns upon chunk delivery
	NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error)

	// Get a particular chunk identified by addr from the local storage
	Get(ctx context.Context, addr chunk.Address) ([]byte, error)

	// Put a certain chunk into the local storage
	Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error)

	// Subscribe to a data stream from an arbitrary data source
	Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func())

	// Cursor returns the last known Cursor for a given Stream Key
	Cursor(interface{}) (uint64, error)

	// RunUpdateStreams is a provider specific implementation on how to maintain running streams with
	// an arbitrary Peer. This method should always be run in a separate goroutine
	RunUpdateStreams(p *Peer)

	// StreamName returns the Name of the Stream (see ID)
	StreamName() string

	// ParseStream from a standard pipe-separated string and return the Stream Key
	ParseKey(string) (interface{}, error)

	// EncodeStream from a Stream Key to a Stream pipe-separated string representation
	EncodeKey(interface{}) (string, error)

	// StreamBehavior defines how the stream behaves upon initialisation
	StreamBehavior() StreamInitBehavior

	Boundedness() bool
}

StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable stream provider as part of the Stream! protocol specification. Since Stream! thoroughly defines the concepts of a stream, intervals, clients and servers, the interface therefore needs only a pluggable provider. The domain interpretable notions which are at the discretion of the implementing provider therefore are - sourcing data (get, put, subscribe for constant new data, and need data which is to decide whether to retrieve data or not), retrieving cursors from the data store, the implementation of which streams to maintain with a certain peer and providing functionality to expose, parse and encode values related to the string represntation of the stream

type StreamState

type StreamState struct {
	Stream  ID
	Code    uint16
	Message string
}

StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state

type WantedHashes

type WantedHashes struct {
	Ruid      uint
	BitVector []byte
}

WantedHashes is a message sent from the downstream peer to the upstream peer in response to OfferedHashes in order to selectively ask for a particular chunks within an interval

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL