Documentation ¶
Index ¶
Constants ¶
const ( HashSize = 32 BatchSize = 16 )
Variables ¶
var ErrEmptyBatch = errors.New("empty batch")
var SyncerSpec = &protocols.Spec{ Name: "bzz-stream", Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ StreamInfoReq{}, StreamInfoRes{}, GetRange{}, OfferedHashes{}, ChunkDelivery{}, WantedHashes{}, }, }
Functions ¶
This section is empty.
Types ¶
type API ¶
type API struct {
*SlipStream
}
Additional public methods accessible through API for pss
func NewAPI ¶
func NewAPI(s *SlipStream) *API
type ChunkDelivery ¶
type ChunkDelivery struct { Ruid uint LastIndex uint Chunks []DeliveredChunk }
ChunkDelivery delivers a frame of chunks in response to a WantedHashes message
type DeliveredChunk ¶
DeliveredChunk encapsulates a particular chunk's underlying data within a ChunkDelivery message
type GetRange ¶
type GetRange struct { Ruid uint Stream ID From uint64 To uint64 `rlp:nil` BatchSize uint Roundtrip bool }
GetRange is a message sent from the downstream peer to the upstream peer asking for chunks within a particular interval for a certain stream
type ID ¶
type ID struct { // Name is used for the Stream provider identification Name string // Key is the name of specific data stream within the stream provider. The semantics of this value // is at the discretion of the stream provider implementation Key string }
Stream defines a unique stream identifier in a textual representation
type OfferedHashes ¶
OfferedHashes is a message sent from the upstream peer to the downstream peer allowing the latter to selectively ask for chunks within a particular requested interval
type Peer ¶
Peer is the Peer extension for the streaming protocol
type SlipStream ¶
type SlipStream struct {
// contains filtered or unexported fields
}
SlipStream is the base type that handles all client/server operations on a node it is instantiated once per stream protocol instance, that is, it should have one instance per node
func NewSlipStream ¶
func NewSlipStream(intervalsStore state.Store, kad *network.Kademlia, providers ...StreamProvider) *SlipStream
func (*SlipStream) APIs ¶
func (s *SlipStream) APIs() []rpc.API
func (*SlipStream) Protocols ¶
func (s *SlipStream) Protocols() []p2p.Protocol
func (*SlipStream) Run ¶
func (s *SlipStream) Run(p *p2p.Peer, rw p2p.MsgReadWriter) error
Run is being dispatched when 2 nodes connect
func (*SlipStream) Stop ¶
func (s *SlipStream) Stop() error
type StreamDescriptor ¶
StreamDescriptor describes an arbitrary stream
type StreamInfoReq ¶
type StreamInfoReq struct {
Streams []ID
}
StreamInfoReq is a request to get information about particular streams
type StreamInfoRes ¶
type StreamInfoRes struct {
Streams []StreamDescriptor
}
StreamInfoRes is a response to StreamInfoReq with the corresponding stream descriptors
type StreamInitBehavior ¶
type StreamInitBehavior int
StreamInitBehavior defines the stream behavior upon init
const ( // StreamIdle means that there is no initial automatic message exchange // between the nodes when the protocol gets established StreamIdle StreamInitBehavior = iota // StreamGetCursors tells the two nodes to automatically fetch stream // cursors from each other StreamGetCursors // StreamAutostart automatically starts fetching data from the streams // once the cursors arrive StreamAutostart )
type StreamProvider ¶
type StreamProvider interface { // NeedData informs the caller whether a certain chunk needs to be fetched from another peer or not. // Typically this will involve checking whether a certain chunk exists locally. // In case a chunk does not exist locally - a `wait` function returns upon chunk delivery NeedData(ctx context.Context, key []byte) (need bool, wait func(context.Context) error) // Get a particular chunk identified by addr from the local storage Get(ctx context.Context, addr chunk.Address) ([]byte, error) // Put a certain chunk into the local storage Put(ctx context.Context, addr chunk.Address, data []byte) (exists bool, err error) // Subscribe to a data stream from an arbitrary data source Subscribe(ctx context.Context, key interface{}, from, to uint64) (<-chan chunk.Descriptor, func()) // Cursor returns the last known Cursor for a given Stream Key Cursor(interface{}) (uint64, error) // RunUpdateStreams is a provider specific implementation on how to maintain running streams with // an arbitrary Peer. This method should always be run in a separate goroutine RunUpdateStreams(p *Peer) // StreamName returns the Name of the Stream (see ID) StreamName() string // ParseStream from a standard pipe-separated string and return the Stream Key ParseKey(string) (interface{}, error) // EncodeStream from a Stream Key to a Stream pipe-separated string representation EncodeKey(interface{}) (string, error) // StreamBehavior defines how the stream behaves upon initialisation StreamBehavior() StreamInitBehavior Boundedness() bool }
StreamProvider interface provides a lightweight abstraction that allows an easily-pluggable stream provider as part of the Stream! protocol specification. Since Stream! thoroughly defines the concepts of a stream, intervals, clients and servers, the interface therefore needs only a pluggable provider. The domain interpretable notions which are at the discretion of the implementing provider therefore are - sourcing data (get, put, subscribe for constant new data, and need data which is to decide whether to retrieve data or not), retrieving cursors from the data store, the implementation of which streams to maintain with a certain peer and providing functionality to expose, parse and encode values related to the string represntation of the stream
type StreamState ¶
StreamState is a message exchanged between two nodes to notify of changes or errors in a stream's state
type WantedHashes ¶
WantedHashes is a message sent from the downstream peer to the upstream peer in response to OfferedHashes in order to selectively ask for a particular chunks within an interval