Documentation ¶
Index ¶
- Constants
- Variables
- func FormatSyncBinKey(bin uint8) string
- func ParseSyncBinKey(s string) (uint8, error)
- func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore)
- func RegisterSwarmSyncerServer(streamer *Registry, netStore *storage.NetStore)
- type API
- type ChunkDeliveryMsg
- type ChunkDeliveryMsgRetrieval
- type ChunkDeliveryMsgSyncing
- type Client
- type Delivery
- type Handover
- type OfferedHashesMsg
- type Peer
- func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error
- func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error
- func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error
- func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error
- type QuitMsg
- type Range
- type Registry
- func (r *Registry) APIs() []rpc.API
- func (r *Registry) Close() error
- func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)
- func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)
- func (r *Registry) GetSpec() *protocols.Spec
- func (r *Registry) Protocols() []p2p.Protocol
- func (r *Registry) Quit(peerId enode.ID, s Stream) error
- func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))
- func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))
- func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error
- func (r *Registry) Run(p *network.BzzPeer) error
- func (r *Registry) Start(server *p2p.Server) error
- func (r *Registry) Stop() error
- func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error
- func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error
- type RegistryOptions
- type RequestSubscriptionMsg
- type RetrieveRequestMsg
- type Server
- type Stream
- type StreamerPrices
- type SubscribeErrorMsg
- type SubscribeMsg
- type SwarmSyncerClient
- type SwarmSyncerServer
- type SyncingOption
- type Takeover
- type TakeoverProof
- type TakeoverProofMsg
- type UnsubscribeMsg
- type WantedHashesMsg
- type WrappedPriorityMsg
Constants ¶
const ( Low uint8 = iota Mid High Top PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top PriorityQueueCap = 4096 // queue capacity HashSize = 32 )
const (
BatchSize = 128
)
Variables ¶
var ErrMaxPeerServers = errors.New("max peer servers")
ErrMaxPeerServers will be returned if peer server limit is reached. It will be sent in the SubscribeErrorMsg.
Functions ¶
func FormatSyncBinKey ¶
FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.
func ParseSyncBinKey ¶
ParseSyncBinKey parses the string representation and returns the Kademlia bin number.
func RegisterSwarmSyncerClient ¶
RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
func (*API) GetPeerServerSubscriptions ¶
GetPeerServerSubscriptions is a API function which allows to query a peer for stream subscriptions it has. It can be called via RPC. It returns a map of node IDs with an array of string representations of Stream objects.
func (*API) SubscribeStream ¶
type ChunkDeliveryMsg ¶
type ChunkDeliveryMsg struct { Addr storage.Address SData []byte // the stored chunk Data (incl size) // contains filtered or unexported fields }
Chunk delivery always uses the same message type....
type ChunkDeliveryMsgRetrieval ¶
type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
defines a chunk delivery for retrieval (with accounting)
type ChunkDeliveryMsgSyncing ¶
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
defines a chunk delivery for syncing (without accounting)
type Client ¶
type Client interface { NeedData(context.Context, []byte) (bool, func(context.Context) error) Close() }
Client interface for incoming peer Streamer
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
type Handover ¶
type Handover struct { Stream Stream // name of stream Start, End uint64 // index of hashes Root []byte // Root hash for indexed segment inclusion proofs }
Handover represents a statement that the upstream peer hands over the stream section
type OfferedHashesMsg ¶
type OfferedHashesMsg struct { Stream Stream // name of Stream From, To uint64 // peer and db-specific entry count Hashes []byte // stream of hashes (128) }
OfferedHashesMsg is the protocol msg for offering to hand over a stream section
func (OfferedHashesMsg) String ¶
func (m OfferedHashesMsg) String() string
String pretty prints OfferedHashesMsg
type Peer ¶
Peer is the Peer extension for the streaming protocol
func (*Peer) Deliver ¶
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error
Deliver sends a storeRequestMsg protocol message to the peer Depending on the `syncing` parameter we send different message types
func (*Peer) SendOfferedHashes ¶
SendOfferedHashes sends OfferedHashesMsg protocol msg
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry registry for outgoing and incoming streamer constructors
func NewRegistry ¶
func NewRegistry(localID enode.ID, delivery *Delivery, netStore *storage.NetStore, intervalsStore state.Store, options *RegistryOptions, balance protocols.Balance) *Registry
NewRegistry is Streamer constructor
func (*Registry) GetClientFunc ¶
GetClient accessor for incoming streamer constructors
func (*Registry) GetServerFunc ¶
GetServer accessor for incoming streamer constructors
func (*Registry) GetSpec ¶
GetSpec returns the streamer spec to callers This used to be a global variable but for simulations with multiple nodes its fields (notably the Hook) would be overwritten
func (*Registry) Quit ¶
Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.
func (*Registry) RegisterClientFunc ¶
RegisterClient registers an incoming streamer constructor
func (*Registry) RegisterServerFunc ¶
RegisterServer registers an outgoing streamer constructor
func (*Registry) RequestSubscription ¶
type RegistryOptions ¶
type RegistryOptions struct { SkipCheck bool Syncing SyncingOption // Defines syncing behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry }
RegistryOptions holds optional values for NewRegistry constructor.
type RequestSubscriptionMsg ¶
type RequestSubscriptionMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream
type RetrieveRequestMsg ¶
RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type Server ¶
type Server interface { // SessionIndex is called when a server is initialized // to get the current cursor state of the stream data. // Based on this index, live and history stream intervals // will be adjusted before calling SetNextBatch. SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error) GetData(context.Context, []byte) ([]byte, error) Close() }
Server interface for outgoing peer Streamer
type Stream ¶
type Stream struct { // Name is used for Client and Server functions identification. Name string // Key is the name of specific stream data. Key string // Live defines whether the stream delivers only new data // for the specific stream. Live bool }
Stream defines a unique stream identifier.
type StreamerPrices ¶
type StreamerPrices struct {
// contains filtered or unexported fields
}
An accountable message needs some meta information attached to it in order to evaluate the correct price
func (*StreamerPrices) Price ¶
func (sp *StreamerPrices) Price(msg interface{}) *protocols.Price
Price implements the accounting interface and returns the price for a specific message
type SubscribeErrorMsg ¶
type SubscribeErrorMsg struct {
Error string
}
type SubscribeMsg ¶
type SubscribeMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
SubcribeMsg is the protocol msg for requesting a stream(section)
type SwarmSyncerClient ¶
type SwarmSyncerClient struct {
// contains filtered or unexported fields
}
SwarmSyncerClient
func NewSwarmSyncerClient ¶
func NewSwarmSyncerClient(p *Peer, netStore *storage.NetStore, stream Stream) (*SwarmSyncerClient, error)
NewSwarmSyncerClient is a contructor for provable data exchange syncer
func (*SwarmSyncerClient) Close ¶
func (s *SwarmSyncerClient) Close()
type SwarmSyncerServer ¶
type SwarmSyncerServer struct {
// contains filtered or unexported fields
}
SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin
func NewSwarmSyncerServer ¶
func NewSwarmSyncerServer(po uint8, netStore *storage.NetStore, correlateId string) (*SwarmSyncerServer, error)
NewSwarmSyncerServer is constructor for SwarmSyncerServer
func (*SwarmSyncerServer) Close ¶
func (s *SwarmSyncerServer) Close()
Close needs to be called on a stream server
func (*SwarmSyncerServer) SessionIndex ¶
func (s *SwarmSyncerServer) SessionIndex() (uint64, error)
SessionIndex returns current storage bin (po) index.
func (*SwarmSyncerServer) SetNextBatch ¶
SetNextBatch retrieves the next batch of hashes from the localstore. It expects a range of bin IDs, both ends inclusive in syncing, and returns concatenated byte slice of chunk addresses and bin IDs of the first and the last one in that slice. The batch may have up to BatchSize number of chunk addresses. If at least one chunk is added to the batch and no new chunks are added in batchTimeout period, the batch will be returned. This function will block until new chunks are received from localstore pull subscription.
type SyncingOption ¶
type SyncingOption int
Enumerate options for syncing and retrieval
const ( // Syncing disabled SyncingDisabled SyncingOption = iota // Register the client and the server but not subscribe SyncingRegisterOnly // Both client and server funcs are registered, subscribe sent automatically SyncingAutoSubscribe )
Syncing options
type Takeover ¶
type Takeover Handover
Takeover represents a statement that downstream peer took over (stored all data) handed over
type TakeoverProof ¶
TakeoverProof represents a signed statement that the downstream peer took over
the stream section
type TakeoverProofMsg ¶
type TakeoverProofMsg TakeoverProof
TakeoverProofMsg is the protocol msg sent by downstream peer
func (TakeoverProofMsg) String ¶
func (m TakeoverProofMsg) String() string
String pretty prints TakeoverProofMsg
type UnsubscribeMsg ¶
type UnsubscribeMsg struct {
Stream Stream
}
type WantedHashesMsg ¶
type WantedHashesMsg struct { Stream Stream Want []byte // bitvector indicating which keys of the batch needed From, To uint64 // next interval offset - empty if not to be continued }
WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over
func (WantedHashesMsg) String ¶
func (m WantedHashesMsg) String() string
String pretty prints WantedHashesMsg