Documentation ¶
Index ¶
- Constants
- Variables
- func FormatSyncBinKey(bin uint8) string
- func ParseSyncBinKey(s string) (uint8, error)
- func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)
- func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)
- type API
- type ChunkDeliveryMsg
- type ChunkDeliveryMsgRetrieval
- type ChunkDeliveryMsgSyncing
- type Client
- type Delivery
- type Handover
- type HandoverProof
- type OfferedHashesMsg
- type Peer
- func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error
- func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error
- func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error
- func (p *Peer) SendPriority(ctx context.Context, msg interface{}, priority uint8) error
- type QuitMsg
- type Range
- type Registry
- func (r *Registry) APIs() []rpc.API
- func (r *Registry) Close() error
- func (r *Registry) GetClientFunc(stream string) (func(*Peer, string, bool) (Client, error), error)
- func (r *Registry) GetServerFunc(stream string) (func(*Peer, string, bool) (Server, error), error)
- func (r *Registry) NodeInfo() interface{}
- func (r *Registry) PeerInfo(id enode.ID) interface{}
- func (r *Registry) Protocols() []p2p.Protocol
- func (r *Registry) Quit(peerId enode.ID, s Stream) error
- func (r *Registry) RegisterClientFunc(stream string, f func(*Peer, string, bool) (Client, error))
- func (r *Registry) RegisterServerFunc(stream string, f func(*Peer, string, bool) (Server, error))
- func (r *Registry) RequestSubscription(peerId enode.ID, s Stream, h *Range, prio uint8) error
- func (r *Registry) Run(p *network.BzzPeer) error
- func (r *Registry) Start(server *p2p.Server) error
- func (r *Registry) Stop() error
- func (r *Registry) Subscribe(peerId enode.ID, s Stream, h *Range, priority uint8) error
- func (r *Registry) Unsubscribe(peerId enode.ID, s Stream) error
- type RegistryOptions
- type RequestSubscriptionMsg
- type RetrievalOption
- type RetrieveRequestMsg
- type Server
- type Stream
- type SubscribeErrorMsg
- type SubscribeMsg
- type SwarmChunkServer
- func (s *SwarmChunkServer) Close()
- func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, error)
- func (s *SwarmChunkServer) SessionIndex() (uint64, error)
- func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
- type SwarmSyncerClient
- func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)
- func (s *SwarmSyncerClient) Close()
- func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error)
- func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error)
- type SwarmSyncerServer
- type SyncingOption
- type Takeover
- type TakeoverProof
- type TakeoverProofMsg
- type UnsubscribeMsg
- type WantedHashesMsg
- type WrappedPriorityMsg
Constants ¶
const ( Low uint8 = iota Mid High Top PriorityQueue = 4 // number of priority queues - Low, Mid, High, Top PriorityQueueCap = 4096 // queue capacity HashSize = 32 )
const (
BatchSize = 128
)
Variables ¶
var ErrMaxPeerServers = errors.New("max peer servers")
ErrMaxPeerServers will be returned if peer server limit is reached. It will be sent in the SubscribeErrorMsg.
var Spec = &protocols.Spec{ Name: "stream", Version: 8, MaxMsgSize: 10 * 1024 * 1024, Messages: []interface{}{ UnsubscribeMsg{}, OfferedHashesMsg{}, WantedHashesMsg{}, TakeoverProofMsg{}, SubscribeMsg{}, RetrieveRequestMsg{}, ChunkDeliveryMsgRetrieval{}, SubscribeErrorMsg{}, RequestSubscriptionMsg{}, QuitMsg{}, ChunkDeliveryMsgSyncing{}, }, }
Spec is the spec of the streamer protocol
Functions ¶
func FormatSyncBinKey ¶
FormatSyncBinKey returns a string representation of Kademlia bin number to be used as key for SYNC stream.
func ParseSyncBinKey ¶
ParseSyncBinKey parses the string representation and returns the Kademlia bin number.
func RegisterSwarmSyncerClient ¶
func RegisterSwarmSyncerClient(streamer *Registry, store storage.SyncChunkStore)
RegisterSwarmSyncerClient registers the client constructor function for to handle incoming sync streams
func RegisterSwarmSyncerServer ¶
func RegisterSwarmSyncerServer(streamer *Registry, syncChunkStore storage.SyncChunkStore)
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
func (*API) SubscribeStream ¶
type ChunkDeliveryMsg ¶
type ChunkDeliveryMsg struct { Addr storage.Address SData []byte // the stored chunk Data (incl size) // contains filtered or unexported fields }
Chunk delivery always uses the same message type....
type ChunkDeliveryMsgRetrieval ¶
type ChunkDeliveryMsgRetrieval ChunkDeliveryMsg
defines a chunk delivery for retrieval (with accounting)
type ChunkDeliveryMsgSyncing ¶
type ChunkDeliveryMsgSyncing ChunkDeliveryMsg
defines a chunk delivery for syncing (without accounting)
type Client ¶
type Client interface { NeedData(context.Context, []byte) func(context.Context) error BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) Close() }
Client interface for incoming peer Streamer
type Delivery ¶
type Delivery struct {
// contains filtered or unexported fields
}
func NewDelivery ¶
func NewDelivery(kad *network.Kademlia, chunkStore storage.SyncChunkStore) *Delivery
type Handover ¶
type Handover struct { Stream Stream // name of stream Start, End uint64 // index of hashes Root []byte // Root hash for indexed segment inclusion proofs }
Handover represents a statement that the upstream peer hands over the stream section
type HandoverProof ¶
HandoverProof represents a signed statement that the upstream peer handed over the stream section
type OfferedHashesMsg ¶
type OfferedHashesMsg struct { Stream Stream // name of Stream From, To uint64 // peer and db-specific entry count Hashes []byte // stream of hashes (128) *HandoverProof // HandoverProof }
OfferedHashesMsg is the protocol msg for offering to hand over a stream section
func (OfferedHashesMsg) String ¶
func (m OfferedHashesMsg) String() string
String pretty prints OfferedHashesMsg
type Peer ¶
Peer is the Peer extension for the streaming protocol
func (*Peer) Deliver ¶
func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error
Deliver sends a storeRequestMsg protocol message to the peer Depending on the `syncing` parameter we send different message types
func (*Peer) SendOfferedHashes ¶
SendOfferedHashes sends OfferedHashesMsg protocol msg
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry registry for outgoing and incoming streamer constructors
func NewRegistry ¶
func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.SyncChunkStore, intervalsStore state.Store, options *RegistryOptions) *Registry
NewRegistry is Streamer constructor
func (*Registry) GetClientFunc ¶
GetClient accessor for incoming streamer constructors
func (*Registry) GetServerFunc ¶
GetServer accessor for incoming streamer constructors
func (*Registry) Quit ¶
Quit sends the QuitMsg to the peer to remove the stream peer client and terminate the streaming.
func (*Registry) RegisterClientFunc ¶
RegisterClient registers an incoming streamer constructor
func (*Registry) RegisterServerFunc ¶
RegisterServer registers an outgoing streamer constructor
func (*Registry) RequestSubscription ¶
type RegistryOptions ¶
type RegistryOptions struct { SkipCheck bool Syncing SyncingOption //Defines syncing behavior Retrieval RetrievalOption //Defines retrieval behavior SyncUpdateDelay time.Duration MaxPeerServers int // The limit of servers for each peer in registry }
RegistryOptions holds optional values for NewRegistry constructor.
type RequestSubscriptionMsg ¶
type RequestSubscriptionMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
RequestSubscriptionMsg is the protocol msg for a node to request subscription to a specific stream
type RetrievalOption ¶
type RetrievalOption int
const ( //Retrieval disabled. Used mostly for tests to isolate syncing features (i.e. syncing only) RetrievalDisabled RetrievalOption = iota //Only the client side of the retrieve request is registered. //(light nodes do not serve retrieve requests) //once the client is registered, subscription to retrieve request stream is always sent RetrievalClientOnly //Both client and server funcs are registered, subscribe sent automatically RetrievalEnabled )
type RetrieveRequestMsg ¶
RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type Server ¶
type Server interface { // SessionIndex is called when a server is initialized // to get the current cursor state of the stream data. // Based on this index, live and history stream intervals // will be adjusted before calling SetNextBatch. SessionIndex() (uint64, error) SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) GetData(context.Context, []byte) ([]byte, error) Close() }
Server interface for outgoing peer Streamer
type Stream ¶
type Stream struct { // Name is used for Client and Server functions identification. Name string // Key is the name of specific stream data. Key string // Live defines whether the stream delivers only new data // for the specific stream. Live bool }
Stream defines a unique stream identifier.
type SubscribeErrorMsg ¶
type SubscribeErrorMsg struct {
Error string
}
type SubscribeMsg ¶
type SubscribeMsg struct { Stream Stream History *Range `rlp:"nil"` Priority uint8 // delivered on priority channel }
SubcribeMsg is the protocol msg for requesting a stream(section)
type SwarmChunkServer ¶
type SwarmChunkServer struct {
// contains filtered or unexported fields
}
SwarmChunkServer implements Server
func NewSwarmChunkServer ¶
func NewSwarmChunkServer(chunkStore storage.ChunkStore) *SwarmChunkServer
NewSwarmChunkServer is SwarmChunkServer constructor
func (*SwarmChunkServer) Close ¶
func (s *SwarmChunkServer) Close()
Close needs to be called on a stream server
func (*SwarmChunkServer) SessionIndex ¶
func (s *SwarmChunkServer) SessionIndex() (uint64, error)
SessionIndex returns zero in all cases for SwarmChunkServer.
func (*SwarmChunkServer) SetNextBatch ¶
func (s *SwarmChunkServer) SetNextBatch(_, _ uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch
type SwarmSyncerClient ¶
type SwarmSyncerClient struct {
// contains filtered or unexported fields
}
SwarmSyncerClient
func NewSwarmSyncerClient ¶
func NewSwarmSyncerClient(p *Peer, store storage.SyncChunkStore, stream Stream) (*SwarmSyncerClient, error)
NewSwarmSyncerClient is a contructor for provable data exchange syncer
func (*SwarmSyncerClient) BatchDone ¶
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error)
BatchDone
func (*SwarmSyncerClient) Close ¶
func (s *SwarmSyncerClient) Close()
func (*SwarmSyncerClient) NeedData ¶
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error)
NeedData
func (*SwarmSyncerClient) TakeoverProof ¶
func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error)
type SwarmSyncerServer ¶
type SwarmSyncerServer struct {
// contains filtered or unexported fields
}
SwarmSyncerServer implements an Server for history syncing on bins offered streams: * live request delivery with or without checkback * (live/non-live historical) chunk syncing per proximity bin
func NewSwarmSyncerServer ¶
func NewSwarmSyncerServer(po uint8, syncChunkStore storage.SyncChunkStore) (*SwarmSyncerServer, error)
NewSwarmSyncerServer is constructor for SwarmSyncerServer
func (*SwarmSyncerServer) Close ¶
func (s *SwarmSyncerServer) Close()
Close needs to be called on a stream server
func (*SwarmSyncerServer) SessionIndex ¶
func (s *SwarmSyncerServer) SessionIndex() (uint64, error)
SessionIndex returns current storage bin (po) index.
func (*SwarmSyncerServer) SetNextBatch ¶
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error)
GetBatch retrieves the next batch of hashes from the dbstore
type SyncingOption ¶
type SyncingOption int
Enumerate options for syncing and retrieval
const ( //Syncing disabled SyncingDisabled SyncingOption = iota //Register the client and the server but not subscribe SyncingRegisterOnly //Both client and server funcs are registered, subscribe sent automatically SyncingAutoSubscribe )
Syncing options
type Takeover ¶
type Takeover Handover
Takeover represents a statement that downstream peer took over (stored all data) handed over
type TakeoverProof ¶
TakeoverProof represents a signed statement that the downstream peer took over
the stream section
type TakeoverProofMsg ¶
type TakeoverProofMsg TakeoverProof
TakeoverProofMsg is the protocol msg sent by downstream peer
func (TakeoverProofMsg) String ¶
func (m TakeoverProofMsg) String() string
String pretty prints TakeoverProofMsg
type UnsubscribeMsg ¶
type UnsubscribeMsg struct {
Stream Stream
}
type WantedHashesMsg ¶
type WantedHashesMsg struct { Stream Stream Want []byte // bitvector indicating which keys of the batch needed From, To uint64 // next interval offset - empty if not to be continued }
WantedHashesMsg is the protocol msg data for signaling which hashes offered in OfferedHashesMsg downstream peer actually wants sent over
func (WantedHashesMsg) String ¶
func (m WantedHashesMsg) String() string
String pretty prints WantedHashesMsg