Documentation ¶
Index ¶
- Constants
- Variables
- func FormatSyncBinKey(bin uint8) string
- func ParseSyncBinKey(s string) (uint8, error)
- func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI)
- func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI)
- type API
- type ChunkDeliveryMsg
- type Client
- type Delivery
- type Handover
- type HandoverProof
- type OfferedHashesMsg
- type Peer
- func (p *Peer) Deliver(ctx context.Context, chunk *storage.Chunk, priority uint8) error
- func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error
- func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error
- func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error
- type QuitMsg
- type Range
- type Registry
- func (r *Registry) APIs() []rpc.API
- func (r *Registry) Close() error
- func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)
- func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)
- func (r *Registry) NodeInfo() interface{}
- func (r *Registry) PeerInfo(id discover.ESSNodeID) interface{}
- func (r *Registry) Protocols() []p2p.Protocol
- func (r *Registry) Quit(peerId discover.ESSNodeID, s Stream) error
- func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))
- func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))
- func (r *Registry) RequestSubscription(peerId discover.ESSNodeID, s Stream, h *Range, prio uint8) error
- func (r *Registry) Retrieve(ctx context.Context, chunk *storage.Chunk) error
- func (r *Registry) Run(p *network.BzzPeer) error
- func (r *Registry) Start(server *p2p.Server) error
- func (r *Registry) Stop() error
- func (r *Registry) Subscribe(peerId discover.ESSNodeID, s Stream, h *Range, priority uint8) error
- func (r *Registry) Unsubscribe(peerId discover.ESSNodeID, s Stream) error
- type RegistryOptions
- type RequestSubscriptionMsg
- type RetrieveRequestMsg
- type Server
- type Stream
- type SubscribeErrorMsg
- type SubscribeMsg
- type SwarmChunkServer
- type SwarmSyncerClient
- func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)
- func (s *SwarmSyncerClient) Close()
- func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func())
- func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error)
- type SwarmSyncerServer
- type Takeover
- type TakeoverProof
- type TakeoverProofMsg
- type UnsubscribeMsg
- type WantedHashesMsg
- type WrappedPriorityMsg
Constants ¶
const ( Low uint8 = iota Mid High Top PriorityQueue // number of queues PriorityQueueCap = 32 // queue capacity HashSize = 32 )
const (
// BatchSize = 2
BatchSize = 128
)
Variables ¶
var Spec = &protocols.Spec{ Name: "stream", Version: 5, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ UnsubscribeMsg{}, OfferedHashesMsg{}, WantedHashesMsg{}, TakeoverProofMsg{}, SubscribeMsg{}, RetrieveRequestMsg{}, ChunkDeliveryMsg{}, SubscribeErrorMsg{}, RequestSubscriptionMsg{}, QuitMsg{}, }, }
Spec is the spec of the streamer protocol
Functions ¶
func FormatSyncBinKey ¶
FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.
func ParseSyncBinKey ¶
ParseSyncBinKey parses the string representation and returns the Kademlia bin number.
func RegisterSwarmSyncerClient ¶
RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
func (*API) SubscribeStream ¶
type ChunkDeliveryMsg ¶
type Client ¶
type Client interface { NeedData(context.Context, []byte) func() BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() }
Client interface for incoming peer Streamer
type Handover ¶
type Handover struct { Stream Stream // name of stream Start, End uint64 // index of hashes Root []byte // Root hash for indexed segment inclusion proofs }
Handover represents a statement that the upstream peer hands over the stream section
type HandoverProof ¶
HandoverProof represents a signed statement that the upstream peer handed over the stream section
type OfferedHashesMsg ¶
type OfferedHashesMsg struct { Stream Stream // name of Stream From, To uint64 // peer and db-specific entry count Hashes []byte // stream of hashes (128) *HandoverProof // HandoverProof }
OfferedHashesMsg is the protocol msg for offering to hand over a stream section
func (OfferedHashesMsg) String ¶
func (m OfferedHashesMsg) String() string
String pretty prints OfferedHashesMsg
type Peer ¶
Peer is the Peer extension for the streaming protocol
func (*Peer) SendOfferedHashes ¶
SendOfferedHashes sends OfferedHashesMsg protocol msg
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry registry for outgoing and incoming streamer constructors
func NewRegistry ¶
func NewRegistry(addr *network.BzzAddr, delivery *Delivery, db *storage.DBAPI, intervalsStore state.Store, options *RegistryOptions) *Registry
NewRegistry is Streamer constructor
func (*Registry) GetClientFunc ¶
GetClient accessor for incoming streamer constructors
func (*Registry) GetServerFunc ¶
GetServer accessor for incoming streamer constructors
func (*Registry) Quit ¶
Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.
func (*Registry) RegisterClientFunc ¶
RegisterClient registers an incoming streamer constructor
func (*Registry) RegisterServerFunc ¶
RegisterServer registers an outgoing streamer constructor
func (*Registry) RequestSubscription ¶
type RegistryOptions ¶
type RegistryOptions struct { SkipCheck bool DoSync bool DoRetrieve bool SyncUpdateDelay time.Duration }
RegistryOptions holds optional values for NewRegistry constructor.
type RequestSubscriptionMsg ¶
type RequestSubscriptionMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream
type RetrieveRequestMsg ¶
RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type Server ¶
type Server interface { SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() }
Server interface for outgoing peer Streamer
type Stream ¶
type Stream struct { // Name is used for Client and Server functions identification. Name string // Key is the name of specific stream data. Key string // Live defines whether the stream delivers only new data // for the specific stream. Live bool }
Stream defines a unique stream identifier.
type SubscribeErrorMsg ¶
type SubscribeErrorMsg struct {
Error string
}
type SubscribeMsg ¶
type SubscribeMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
SubcribeMsg is the protocol msg for requesting a stream(section)
type SwarmChunkServer ¶
type SwarmChunkServer struct {
// contains filtered or unexported fields
}
SwarmChunkServer implements Server
func NewSwarmChunkServer ¶
func NewSwarmChunkServer(db *storage.DBAPI) *SwarmChunkServer
NewSwarmChunkServer is SwarmChunkServer constructor
func (*SwarmChunkServer) Close ¶
func (s *SwarmChunkServer) Close()
Close needs to be called on a stream server
func (*SwarmChunkServer) SetNextBatch ¶
func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch
type SwarmSyncerClient ¶
type SwarmSyncerClient struct {
// contains filtered or unexported fields
}
SwarmSyncerClient
func NewSwarmSyncerClient ¶
func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error)
NewSwarmSyncerClient is a contructor for provable data exchange syncer
func (*SwarmSyncerClient) BatchDone ¶
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)
BatchDone
func (*SwarmSyncerClient) Close ¶
func (s *SwarmSyncerClient) Close()
func (*SwarmSyncerClient) NeedData ¶
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func())
NeedData
func (*SwarmSyncerClient) TakeoverProof ¶
func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error)
type SwarmSyncerServer ¶
type SwarmSyncerServer struct {
// contains filtered or unexported fields
}
SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin
func NewSwarmSyncerServer ¶
NewSwarmSyncerServer is contructor for SwarmSyncerServer
func (*SwarmSyncerServer) Close ¶
func (s *SwarmSyncerServer) Close()
Close needs to be called on a stream server
func (*SwarmSyncerServer) SetNextBatch ¶
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)
GetBatch retrieves the next batch of hashes from the dbstore
type Takeover ¶
type Takeover Handover
Takeover represents a statement that downstream peer took over (stored all data) handed over
type TakeoverProof ¶
TakeoverProof represents a signed statement that the downstream peer took over
the stream section
type TakeoverProofMsg ¶
type TakeoverProofMsg TakeoverProof
TakeoverProofMsg is the protocol msg sent by downstream peer
func (TakeoverProofMsg) String ¶
func (m TakeoverProofMsg) String() string
String pretty prints TakeoverProofMsg
type UnsubscribeMsg ¶
type UnsubscribeMsg struct {
Stream Stream
}
type WantedHashesMsg ¶
type WantedHashesMsg struct { Stream Stream Want []byte // bitvector indicating which keys of the batch needed From, To uint64 // next interval offset - empty if not to be continued }
WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over
func (WantedHashesMsg) String ¶
func (m WantedHashesMsg) String() string
String pretty prints WantedHashesMsg