stream

package
v0.0.0-...-11289c7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 13, 2024 License: GPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Low uint8 = iota
	Mid
	High
	Top
	PriorityQueue    = 4    // number of priority queues - Low, Mid, High, Top
	PriorityQueueCap = 4096 // queue capacity
	HashSize         = 32
)
View Source
const (
	BatchSize = 128
)

Variables

View Source
var ErrMaxPeerServers = errors.New("max peer servers")

ErrMaxPeerServers will be returned if peer server limit is reached. It will be sent in the SubscribeErrorMsg.

Functions

func FormatSyncBinKey

func FormatSyncBinKey(bin uint8) string

FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.

func ParseSyncBinKey

func ParseSyncBinKey(s string) (uint8, error)

ParseSyncBinKey parses the string representation and returns the Kademlia bin number.

func RegisterSwarmSyncerClient

func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)

RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams

func RegisterSwarmSyncerServer

func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)

Types

type API

type API struct {
	// contains filtered or unexported fields
}

func NewAPI

func NewAPI(r *Registry) *API

func (*API) GetPeerSubscriptions

func (api *API) GetPeerSubscriptions() map[string][]string

GetPeerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects.

func (*API) SubscribeStream

func (api *API) SubscribeStream(peerId enode.ID, s Stream, history *Range, priority uint8) error

func (*API) UnsubscribeStream

func (api *API) UnsubscribeStream(peerId enode.ID, s Stream) error

type ChunkDeliveryMsg

type ChunkDeliveryMsg struct {
	Addr  storage.Address
	SData []byte // the stored chunk Data (incl size)
	// contains filtered or unexported fields
}

Chunk delivery always uses the same message type....

type ChunkDeliveryMsgRetrieval

type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg

defines a chunk delivery for retrieval (with accounting)

type ChunkDeliveryMsgSyncing

type ChunkDeliveryMsgSyncing ChunkDeliveryMsg

defines a chunk delivery for syncing (without accounting)

type Client

type Client interface {
	NeedData(context.Context, []byte) func(context.Context) error
	BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
	Close()
}

Client interface for incoming peer Streamer

type Delivery

type Delivery struct {
	// contains filtered or unexported fields
}

func NewDelivery

func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery

func (*Delivery) RequestFromPeers

func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error)

RequestFromPeers sends a chunk retrieve request to a peer The most eligible peer that hasn't already been sent to is chosen TODO: define "eligible"

type Handover

type Handover struct {
	Stream     Stream // name of stream
	Start, End uint64 // index of hashes
	Root       []byte // Root hash for indexed segment inclusion proofs
}

Handover represents a statement that the upstream peer hands over the stream section

type HandoverProof

type HandoverProof struct {
	Sig []byte // Sign(Hash(Serialisation(Handover)))
	*Handover
}

HandoverProof represents a signed statement that the upstream peer handed over the stream section

type OfferedHashesMsg

type OfferedHashesMsg struct {
	Stream         Stream // name of Stream
	From, To       uint64 // peer and db-specific entry count
	Hashes         []byte // stream of hashes (128)
	*HandoverProof        // HandoverProof
}

OfferedHashesMsg is the protocol msg for offering to hand over a stream section

func (OfferedHashesMsg) String

func (m OfferedHashesMsg) String() string

String pretty prints OfferedHashesMsg

type Peer

type Peer struct {
	*protocols.Peer
	// contains filtered or unexported fields
}

Peer is the Peer extension for the streaming protocol

func NewPeer

func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer

NewPeer is the constructor for Peer

func (*Peer) Deliver

func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error

Deliver sends a storeRequestMsg protocol message to the peer Depending on the `syncing` parameter we send different message types

func (*Peer) HandleMsg

func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error

HandleMsg is the message handler that delegates incoming messages

func (*Peer) SendOfferedHashes

func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error

SendOfferedHashes sends OfferedHashesMsg protocol msg

func (*Peer) SendPriority

func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error

SendPriority sends message to the peer using the outgoing priority queue

type QuitMsg

type QuitMsg struct {
	Stream Stream
}

type Range

type Range struct {
	From, To uint64
}

func NewRange

func NewRange(from, to uint64) *Range

func (*Range) String

func (r *Range) String() string

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry registry for outgoing and incoming streamer constructors

func NewRegistry

func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry

NewRegistry is Streamer constructor

func (*Registry) APIs

func (r *Registry) APIs() []rpc.API

func (*Registry) Close

func (r *Registry) Close() error

func (*Registry) GetClientFunc

func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)

GetClient accessor for incoming streamer constructors

func (*Registry) GetServerFunc

func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)

GetServer accessor for incoming streamer constructors

func (*Registry) GetSpec

func (r *Registry) GetSpec() *protocols.Spec

GetSpec returns the streamer spec to callers This used to be a global variable but for simulations with multiple nodes its fields (notably the Hook) would be overwritten

func (*Registry) Protocols

func (r *Registry) Protocols() []p2p.Protocol

func (*Registry) Quit

func (r *Registry) Quit(peerId enode.ID, s Stream) error

Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.

func (*Registry) RegisterClientFunc

func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))

RegisterClient registers an incoming streamer constructor

func (*Registry) RegisterServerFunc

func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))

RegisterServer registers an outgoing streamer constructor

func (*Registry) RequestSubscription

func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error

func (*Registry) Run

func (r *Registry) Run(p *network.BzzPeer) error

Run protocol run function

func (*Registry) Start

func (r *Registry) Start(server *p2p.Server) error

func (*Registry) Stop

func (r *Registry) Stop() error

func (*Registry) Subscribe

func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error

Subscribe initiates the streamer

func (*Registry) Unsubscribe

func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error

type RegistryOptions

type RegistryOptions struct {
	SkipCheck       bool
	Syncing         SyncingOption   // Defines syncing behavior
	Retrieval       RetrievalOption // Defines retrieval behavior
	SyncUpdateDelay time.Duration
	MaxPeerServers  int // The limit of servers for each peer in registry
}

RegistryOptions holds optional values for NewRegistry constructor.

type RequestSubscriptionMsg

type RequestSubscriptionMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  // delivered on priority channel
}

RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream

type RetrievalOption

type RetrievalOption int
const (
	// Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only)
	RetrievalDisabled RetrievalOption = iota
	// Only the client side of the retrieve request is registered.
	// (light nodes do not serve retrieve requests)
	// once the client is registered, subscription to retrieve request stream is always sent
	RetrievalClientOnly
	// Both client and server funcs are registered, subscribe sent automatically
	RetrievalEnabled
)

type RetrieveRequestMsg

type RetrieveRequestMsg struct {
	Addr      storage.Address
	SkipCheck bool
	HopCount  uint8
}

RetrieveRequestMsg is the protocol msg for chunk retrieve requests

type Server

type Server interface {
	// SessionIndex is called when a server is initialized
	// to get the current cursor state of the stream data.
	// Based on this index, live and history stream intervals
	// will be adjusted before calling SetNextBatch.
	SessionIndex() (uint64, error)
	SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
	GetData(context.Context, []byte) ([]byte, error)
	Close()
}

Server interface for outgoing peer Streamer

type Stream

type Stream struct {
	// Name is used for Client and Server functions identification.
	Name string
	// Key is the name of specific stream data.
	Key string
	// Live defines whether the stream delivers only new data
	// for the specific stream.
	Live bool
}

Stream defines a unique stream identifier.

func NewStream

func NewStream(name string, key string, live bool) Stream

func (Stream) String

func (s Stream) String() string

String return a stream id based on all Stream fields.

type StreamerPrices

type StreamerPrices struct {
	// contains filtered or unexported fields
}

An accountable message needs some meta information attached to it in order to evaluate the correct price

func (*StreamerPrices) Price

func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price

Price implements the accounting interface and returns the price for a specific message

type SubscribeErrorMsg

type SubscribeErrorMsg struct {
	Error string
}

type SubscribeMsg

type SubscribeMsg struct {
	Stream   Stream
	History  *Range `rlp:"nil"`
	Priority uint8  // delivered on priority channel
}

SubcribeMsg is the protocol msg for requesting a stream(section)

type SwarmChunkServer

type SwarmChunkServer struct {
	// contains filtered or unexported fields
}

SwarmChunkServer implements Server

func NewSwarmChunkServer

func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer

NewSwarmChunkServer is SwarmChunkServer constructor

func (*SwarmChunkServer) Close

func (s *SwarmChunkServer) Close()

Close needs to be called on a stream server

func (*SwarmChunkServer) GetData

func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error)

GetData retrives chunk data from db store

func (*SwarmChunkServer) SessionIndex

func (s *SwarmChunkServer) SessionIndex() (uint64, error)

SessionIndex returns zero in all cases for SwarmChunkServer.

func (*SwarmChunkServer) SetNextBatch

func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)

SetNextBatch

type SwarmSyncerClient

type SwarmSyncerClient struct {
	// contains filtered or unexported fields
}

SwarmSyncerClient

func NewSwarmSyncerClient

func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error)

NewSwarmSyncerClient is a contructor for provable data exchange syncer

func (*SwarmSyncerClient) BatchDone

func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)

BatchDone

func (*SwarmSyncerClient) Close

func (s *SwarmSyncerClient) Close()

func (*SwarmSyncerClient) NeedData

func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error)

NeedData

type SwarmSyncerServer

type SwarmSyncerServer struct {
	// contains filtered or unexported fields
}

SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin

func NewSwarmSyncerServer

func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error)

NewSwarmSyncerServer is constructor for SwarmSyncerServer

func (*SwarmSyncerServer) Close

func (s *SwarmSyncerServer) Close()

Close needs to be called on a stream server

func (*SwarmSyncerServer) GetData

func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error)

GetData retrieves the actual chunk from netstore

func (*SwarmSyncerServer) SessionIndex

func (s *SwarmSyncerServer) SessionIndex() (uint64, error)

SessionIndex returns current storage bin (po) index.

func (*SwarmSyncerServer) SetNextBatch

func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)

GetBatch retrieves the next batch of hashes from the dbstore

type SyncingOption

type SyncingOption int

Enumerate options for syncing and retrieval

const (
	// Syncing disabled
	SyncingDisabled SyncingOption = iota
	// Register the client and the server but not subscribe
	SyncingRegisterOnly
	// Both client and server funcs are registered, subscribe sent automatically
	SyncingAutoSubscribe
)

Syncing options

type Takeover

type Takeover Handover

Takeover represents a statement that downstream peer took over (stored all data) handed over

type TakeoverProof

type TakeoverProof struct {
	Sig []byte // Sign(Hash(Serialisation(Takeover)))
	*Takeover
}
TakeoverProof represents a signed statement that the downstream peer took over

the stream section

type TakeoverProofMsg

type TakeoverProofMsg TakeoverProof

TakeoverProofMsg is the protocol msg sent by downstream peer

func (TakeoverProofMsg) String

func (m TakeoverProofMsg) String() string

String pretty prints TakeoverProofMsg

type UnsubscribeMsg

type UnsubscribeMsg struct {
	Stream Stream
}

type WantedHashesMsg

type WantedHashesMsg struct {
	Stream   Stream
	Want     []byte // bitvector indicating which keys of the batch needed
	From, To uint64 // next interval offset - empty if not to be continued
}

WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over

func (WantedHashesMsg) String

func (m WantedHashesMsg) String() string

String pretty prints WantedHashesMsg

type WrappedPriorityMsg

type WrappedPriorityMsg struct {
	Context context.Context
	Msg     interface{}
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL